You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/04/07 23:59:25 UTC

[beam] branch master updated: [Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f46231b6f0 [Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)
2f46231b6f0 is described below

commit 2f46231b6f00e06c85f6cf24aa699abcd416a97a
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Sat Apr 8 01:59:17 2023 +0200

    [Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)
    
    * Refactor textio Read transforms to use fileio transforms
    
    * Add support for reading gzip files in textio
---
 sdks/go/data/textio_test.gz               | Bin 0 -> 50 bytes
 sdks/go/pkg/beam/io/textio/textio.go      | 117 +++++++++++++++---------------
 sdks/go/pkg/beam/io/textio/textio_test.go |  26 ++++++-
 3 files changed, 83 insertions(+), 60 deletions(-)

diff --git a/sdks/go/data/textio_test.gz b/sdks/go/data/textio_test.gz
new file mode 100644
index 00000000000..a0c9b3ac2d3
Binary files /dev/null and b/sdks/go/data/textio_test.gz differ
diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go
index ed8be0a42b2..532a7ae0800 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
@@ -35,25 +36,56 @@ import (
 func init() {
 	beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
 	beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
-	beam.RegisterFunction(expandFn)
+}
+
+type readOption struct {
+	FileOpts []fileio.ReadOptionFn
+}
+
+// ReadOptionFn is a function that can be passed to Read or ReadAll to configure options for
+// reading files.
+type ReadOptionFn func(*readOption)
+
+// ReadAutoCompression specifies that the compression type of files should be auto-detected.
+func ReadAutoCompression() ReadOptionFn {
+	return func(o *readOption) {
+		o.FileOpts = append(o.FileOpts, fileio.ReadAutoCompression())
+	}
+}
+
+// ReadGzip specifies that files have been compressed using gzip.
+func ReadGzip() ReadOptionFn {
+	return func(o *readOption) {
+		o.FileOpts = append(o.FileOpts, fileio.ReadGzip())
+	}
+}
+
+// ReadUncompressed specifies that files have not been compressed.
+func ReadUncompressed() ReadOptionFn {
+	return func(o *readOption) {
+		o.FileOpts = append(o.FileOpts, fileio.ReadUncompressed())
+	}
 }
 
 // Read reads a set of files indicated by the glob pattern and returns
-// the lines as a PCollection<string>.
-// The newlines are not part of the lines.
-func Read(s beam.Scope, glob string) beam.PCollection {
+// the lines as a PCollection<string>. The newlines are not part of the lines.
+// Read accepts a variadic number of ReadOptionFn that can be used to configure the compression
+// type of the file. By default, the compression type is determined by the file extension.
+func Read(s beam.Scope, glob string, opts ...ReadOptionFn) beam.PCollection {
 	s = s.Scope("textio.Read")
 
 	filesystem.ValidateScheme(glob)
-	return read(s, beam.Create(s, glob))
+	return read(s, beam.Create(s, glob), opts...)
 }
 
 // ReadAll expands and reads the filename given as globs by the incoming
 // PCollection<string>. It returns the lines of all files as a single
 // PCollection<string>. The newlines are not part of the lines.
-func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection {
+// ReadAll accepts a variadic number of ReadOptionFn that can be used to configure the compression
+// type of the files. By default, the compression type is determined by the file extension.
+func ReadAll(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection {
 	s = s.Scope("textio.ReadAll")
-	return read(s, col)
+	return read(s, col, opts...)
 }
 
 // ReadSdf is a variation of Read implemented via SplittableDoFn. This should
@@ -64,7 +96,7 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection {
 	s = s.Scope("textio.ReadSdf")
 
 	filesystem.ValidateScheme(glob)
-	return read(s, beam.Create(s, glob))
+	return read(s, beam.Create(s, glob), ReadUncompressed())
 }
 
 // ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This
@@ -73,37 +105,21 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection {
 // Deprecated: Use ReadAll instead, which has been migrated to use this SDF implementation.
 func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
 	s = s.Scope("textio.ReadAllSdf")
-	return read(s, col)
+	return read(s, col, ReadUncompressed())
 }
 
 // read takes a PCollection of globs and returns a PCollection of lines from
 // all files in those globs. Uses an SDF to allow splitting reads of files
 // into separate bundles.
-func read(s beam.Scope, col beam.PCollection) beam.PCollection {
-	files := beam.ParDo(s, expandFn, col)
-	return beam.ParDo(s, &readFn{}, files)
-}
-
-// expandFn expands a glob pattern into all matching file names.
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
-	if strings.TrimSpace(glob) == "" {
-		return nil // ignore empty string elements here
+func read(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection {
+	option := &readOption{}
+	for _, opt := range opts {
+		opt(option)
 	}
 
-	fs, err := filesystem.New(ctx, glob)
-	if err != nil {
-		return err
-	}
-	defer fs.Close()
-
-	files, err := fs.List(ctx, glob)
-	if err != nil {
-		return err
-	}
-	for _, filename := range files {
-		emit(filename)
-	}
-	return nil
+	matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+	files := fileio.ReadMatches(s, matches, option.FileOpts...)
+	return beam.ParDo(s, &readFn{}, files)
 }
 
 // readFn reads individual lines from a text file. Implemented as an SDF
@@ -113,22 +129,11 @@ type readFn struct {
 
 // CreateInitialRestriction creates an offset range restriction representing
 // the file's size in bytes.
-func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename string) (offsetrange.Restriction, error) {
-	fs, err := filesystem.New(ctx, filename)
-	if err != nil {
-		return offsetrange.Restriction{}, err
-	}
-	defer fs.Close()
-
-	size, err := fs.Size(ctx, filename)
-	if err != nil {
-		return offsetrange.Restriction{}, err
-	}
-
+func (fn *readFn) CreateInitialRestriction(file fileio.ReadableFile) offsetrange.Restriction {
 	return offsetrange.Restriction{
 		Start: 0,
-		End:   size,
-	}, nil
+		End:   file.Metadata.Size,
+	}
 }
 
 const (
@@ -139,9 +144,9 @@ const (
 	tooSmall = blockSize / 4
 )
 
-// SplitRestriction splits each file restriction into blocks of a predeterined
+// SplitRestriction splits each file restriction into blocks of a predetermined
 // size, with some checks to avoid having small remainders.
-func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction {
+func (fn *readFn) SplitRestriction(_ fileio.ReadableFile, rest offsetrange.Restriction) []offsetrange.Restriction {
 	splits := rest.SizedSplits(blockSize)
 	numSplits := len(splits)
 	if numSplits > 1 {
@@ -155,8 +160,8 @@ func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []off
 	return splits
 }
 
-// Size returns the size of each restriction as its range.
-func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 {
+// RestrictionSize returns the size of each restriction as its range.
+func (fn *readFn) RestrictionSize(_ fileio.ReadableFile, rest offsetrange.Restriction) float64 {
 	return rest.Size()
 }
 
@@ -174,16 +179,10 @@ func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
 // begin within the restriction and past the restriction (those are entirely
 // output, including the portion outside the restriction). In some cases a
 // valid restriction might not output any lines.
-func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, emit func(string)) error {
-	log.Infof(ctx, "Reading from %v", filename)
-
-	fs, err := filesystem.New(ctx, filename)
-	if err != nil {
-		return err
-	}
-	defer fs.Close()
+func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, file fileio.ReadableFile, emit func(string)) error {
+	log.Infof(ctx, "Reading from %v", file.Metadata.Path)
 
-	fd, err := fs.OpenRead(ctx, filename)
+	fd, err := file.Open(ctx)
 	if err != nil {
 		return err
 	}
diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go
index 3a80f44cd4c..eb904d74789 100644
--- a/sdks/go/pkg/beam/io/textio/textio_test.go
+++ b/sdks/go/pkg/beam/io/textio/textio_test.go
@@ -20,6 +20,7 @@ import (
 	"context"
 	"errors"
 	"os"
+	"path/filepath"
 	"testing"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -28,7 +29,12 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
 )
 
-const testFilePath = "../../../../data/textio_test.txt"
+const testDir = "../../../../data"
+
+var (
+	testFilePath   = filepath.Join(testDir, "textio_test.txt")
+	testGzFilePath = filepath.Join(testDir, "textio_test.gz")
+)
 
 func TestRead(t *testing.T) {
 	p, s := beam.NewPipelineWithRoot()
@@ -38,6 +44,15 @@ func TestRead(t *testing.T) {
 	ptest.RunAndValidate(t, p)
 }
 
+func TestReadGzip(t *testing.T) {
+	p, s := beam.NewPipelineWithRoot()
+	got := Read(s, testGzFilePath, ReadGzip())
+	want := []any{"hello", "go"}
+
+	passert.Equals(s, got, want...)
+	ptest.RunAndValidate(t, p)
+}
+
 func TestReadAll(t *testing.T) {
 	p, s, files := ptest.CreateList([]string{testFilePath})
 	lines := ReadAll(s, files)
@@ -46,6 +61,15 @@ func TestReadAll(t *testing.T) {
 	ptest.RunAndValidate(t, p)
 }
 
+func TestReadAllGzip(t *testing.T) {
+	p, s, files := ptest.CreateList([]string{testGzFilePath})
+	got := ReadAll(s, files, ReadGzip())
+	want := []any{"hello", "go"}
+
+	passert.Equals(s, got, want...)
+	ptest.RunAndValidate(t, p)
+}
+
 func TestWrite(t *testing.T) {
 	out := "text.txt"
 	p, s := beam.NewPipelineWithRoot()