You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/04/07 23:59:25 UTC
[beam] branch master updated: [Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2f46231b6f0 [Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)
2f46231b6f0 is described below
commit 2f46231b6f00e06c85f6cf24aa699abcd416a97a
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Sat Apr 8 01:59:17 2023 +0200
[Go SDK]: Add support for reading gzip files in textio Read and ReadAll (#26163)
* Refactor textio Read transforms to use fileio transforms
* Add support for reading gzip files in textio
---
sdks/go/data/textio_test.gz | Bin 0 -> 50 bytes
sdks/go/pkg/beam/io/textio/textio.go | 117 +++++++++++++++---------------
sdks/go/pkg/beam/io/textio/textio_test.go | 26 ++++++-
3 files changed, 83 insertions(+), 60 deletions(-)
diff --git a/sdks/go/data/textio_test.gz b/sdks/go/data/textio_test.gz
new file mode 100644
index 00000000000..a0c9b3ac2d3
Binary files /dev/null and b/sdks/go/data/textio_test.gz differ
diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go
index ed8be0a42b2..532a7ae0800 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
@@ -35,25 +36,56 @@ import (
func init() {
beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
- beam.RegisterFunction(expandFn)
+}
+
+type readOption struct {
+ FileOpts []fileio.ReadOptionFn
+}
+
+// ReadOptionFn is a function that can be passed to Read or ReadAll to configure options for
+// reading files.
+type ReadOptionFn func(*readOption)
+
+// ReadAutoCompression specifies that the compression type of files should be auto-detected.
+func ReadAutoCompression() ReadOptionFn {
+ return func(o *readOption) {
+ o.FileOpts = append(o.FileOpts, fileio.ReadAutoCompression())
+ }
+}
+
+// ReadGzip specifies that files have been compressed using gzip.
+func ReadGzip() ReadOptionFn {
+ return func(o *readOption) {
+ o.FileOpts = append(o.FileOpts, fileio.ReadGzip())
+ }
+}
+
+// ReadUncompressed specifies that files have not been compressed.
+func ReadUncompressed() ReadOptionFn {
+ return func(o *readOption) {
+ o.FileOpts = append(o.FileOpts, fileio.ReadUncompressed())
+ }
}
// Read reads a set of files indicated by the glob pattern and returns
-// the lines as a PCollection<string>.
-// The newlines are not part of the lines.
-func Read(s beam.Scope, glob string) beam.PCollection {
+// the lines as a PCollection<string>. The newlines are not part of the lines.
+// Read accepts a variadic number of ReadOptionFn that can be used to configure the compression
+// type of the file. By default, the compression type is determined by the file extension.
+func Read(s beam.Scope, glob string, opts ...ReadOptionFn) beam.PCollection {
s = s.Scope("textio.Read")
filesystem.ValidateScheme(glob)
- return read(s, beam.Create(s, glob))
+ return read(s, beam.Create(s, glob), opts...)
}
// ReadAll expands and reads the filename given as globs by the incoming
// PCollection<string>. It returns the lines of all files as a single
// PCollection<string>. The newlines are not part of the lines.
-func ReadAll(s beam.Scope, col beam.PCollection) beam.PCollection {
+// ReadAll accepts a variadic number of ReadOptionFn that can be used to configure the compression
+// type of the files. By default, the compression type is determined by the file extension.
+func ReadAll(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection {
s = s.Scope("textio.ReadAll")
- return read(s, col)
+ return read(s, col, opts...)
}
// ReadSdf is a variation of Read implemented via SplittableDoFn. This should
@@ -64,7 +96,7 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection {
s = s.Scope("textio.ReadSdf")
filesystem.ValidateScheme(glob)
- return read(s, beam.Create(s, glob))
+ return read(s, beam.Create(s, glob), ReadUncompressed())
}
// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This
@@ -73,37 +105,21 @@ func ReadSdf(s beam.Scope, glob string) beam.PCollection {
// Deprecated: Use ReadAll instead, which has been migrated to use this SDF implementation.
func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
s = s.Scope("textio.ReadAllSdf")
- return read(s, col)
+ return read(s, col, ReadUncompressed())
}
// read takes a PCollection of globs and returns a PCollection of lines from
// all files in those globs. Uses an SDF to allow splitting reads of files
// into separate bundles.
-func read(s beam.Scope, col beam.PCollection) beam.PCollection {
- files := beam.ParDo(s, expandFn, col)
- return beam.ParDo(s, &readFn{}, files)
-}
-
-// expandFn expands a glob pattern into all matching file names.
-func expandFn(ctx context.Context, glob string, emit func(string)) error {
- if strings.TrimSpace(glob) == "" {
- return nil // ignore empty string elements here
+func read(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection {
+ option := &readOption{}
+ for _, opt := range opts {
+ opt(option)
}
- fs, err := filesystem.New(ctx, glob)
- if err != nil {
- return err
- }
- defer fs.Close()
-
- files, err := fs.List(ctx, glob)
- if err != nil {
- return err
- }
- for _, filename := range files {
- emit(filename)
- }
- return nil
+ matches := fileio.MatchAll(s, col, fileio.MatchEmptyAllow())
+ files := fileio.ReadMatches(s, matches, option.FileOpts...)
+ return beam.ParDo(s, &readFn{}, files)
}
// readFn reads individual lines from a text file. Implemented as an SDF
@@ -113,22 +129,11 @@ type readFn struct {
// CreateInitialRestriction creates an offset range restriction representing
// the file's size in bytes.
-func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename string) (offsetrange.Restriction, error) {
- fs, err := filesystem.New(ctx, filename)
- if err != nil {
- return offsetrange.Restriction{}, err
- }
- defer fs.Close()
-
- size, err := fs.Size(ctx, filename)
- if err != nil {
- return offsetrange.Restriction{}, err
- }
-
+func (fn *readFn) CreateInitialRestriction(file fileio.ReadableFile) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
- End: size,
- }, nil
+ End: file.Metadata.Size,
+ }
}
const (
@@ -139,9 +144,9 @@ const (
tooSmall = blockSize / 4
)
-// SplitRestriction splits each file restriction into blocks of a predeterined
+// SplitRestriction splits each file restriction into blocks of a predetermined
// size, with some checks to avoid having small remainders.
-func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction {
+func (fn *readFn) SplitRestriction(_ fileio.ReadableFile, rest offsetrange.Restriction) []offsetrange.Restriction {
splits := rest.SizedSplits(blockSize)
numSplits := len(splits)
if numSplits > 1 {
@@ -155,8 +160,8 @@ func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []off
return splits
}
-// Size returns the size of each restriction as its range.
-func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 {
+// RestrictionSize returns the size of each restriction as its range.
+func (fn *readFn) RestrictionSize(_ fileio.ReadableFile, rest offsetrange.Restriction) float64 {
return rest.Size()
}
@@ -174,16 +179,10 @@ func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
// begin within the restriction and past the restriction (those are entirely
// output, including the portion outside the restriction). In some cases a
// valid restriction might not output any lines.
-func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, emit func(string)) error {
- log.Infof(ctx, "Reading from %v", filename)
-
- fs, err := filesystem.New(ctx, filename)
- if err != nil {
- return err
- }
- defer fs.Close()
+func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, file fileio.ReadableFile, emit func(string)) error {
+ log.Infof(ctx, "Reading from %v", file.Metadata.Path)
- fd, err := fs.OpenRead(ctx, filename)
+ fd, err := file.Open(ctx)
if err != nil {
return err
}
diff --git a/sdks/go/pkg/beam/io/textio/textio_test.go b/sdks/go/pkg/beam/io/textio/textio_test.go
index 3a80f44cd4c..eb904d74789 100644
--- a/sdks/go/pkg/beam/io/textio/textio_test.go
+++ b/sdks/go/pkg/beam/io/textio/textio_test.go
@@ -20,6 +20,7 @@ import (
"context"
"errors"
"os"
+ "path/filepath"
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -28,7 +29,12 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)
-const testFilePath = "../../../../data/textio_test.txt"
+const testDir = "../../../../data"
+
+var (
+ testFilePath = filepath.Join(testDir, "textio_test.txt")
+ testGzFilePath = filepath.Join(testDir, "textio_test.gz")
+)
func TestRead(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
@@ -38,6 +44,15 @@ func TestRead(t *testing.T) {
ptest.RunAndValidate(t, p)
}
+func TestReadGzip(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ got := Read(s, testGzFilePath, ReadGzip())
+ want := []any{"hello", "go"}
+
+ passert.Equals(s, got, want...)
+ ptest.RunAndValidate(t, p)
+}
+
func TestReadAll(t *testing.T) {
p, s, files := ptest.CreateList([]string{testFilePath})
lines := ReadAll(s, files)
@@ -46,6 +61,15 @@ func TestReadAll(t *testing.T) {
ptest.RunAndValidate(t, p)
}
+func TestReadAllGzip(t *testing.T) {
+ p, s, files := ptest.CreateList([]string{testGzFilePath})
+ got := ReadAll(s, files, ReadGzip())
+ want := []any{"hello", "go"}
+
+ passert.Equals(s, got, want...)
+ ptest.RunAndValidate(t, p)
+}
+
func TestWrite(t *testing.T) {
out := "text.txt"
p, s := beam.NewPipelineWithRoot()