You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2020/10/29 06:29:11 UTC
[beam] branch master updated: [BEAM-11108] Add a version of TextIO
implemented via SDF. (#13188)
This is an automated email from the ASF dual-hosted git repository.
danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce190e1 [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188)
ce190e1 is described below
commit ce190e11332469ea59b6c9acf16ee7c673ccefdd
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Wed Oct 28 23:28:08 2020 -0700
[BEAM-11108] Add a version of TextIO implemented via SDF. (#13188)
Pretty straightforward. Read and ReadSdf should be functionally identical, and likewise for ReadAll and ReadAllSdf. Also adds a new OffsetRange method with unit tests.
---
sdks/go/examples/stringsplit/stringsplit.go | 2 +-
sdks/go/examples/wordcount/wordcount.go | 8 +
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 2 +-
.../beam/io/rtrackers/offsetrange/offsetrange.go | 29 ++-
.../io/rtrackers/offsetrange/offsetrange_test.go | 97 +++++++++-
sdks/go/pkg/beam/io/textio/sdf.go | 203 +++++++++++++++++++++
sdks/go/pkg/beam/io/textio/sdf_test.go | 39 ++++
sdks/go/pkg/beam/io/textio/textio.go | 2 +-
8 files changed, 372 insertions(+), 10 deletions(-)
diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go
index e7ea230..13a1905 100644
--- a/sdks/go/examples/stringsplit/stringsplit.go
+++ b/sdks/go/examples/stringsplit/stringsplit.go
@@ -39,11 +39,11 @@ package main
import (
"context"
"flag"
- "github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
"reflect"
"time"
"github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go
index fabaa60..d41ad01 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -101,6 +101,14 @@ var (
// output any number of elements. It operates on a PCollection of type string and
// returns a PCollection of type string. Also, using named function transforms allows
// for easy reuse, modular testing, and an improved monitoring experience.
+//
+// DoFns must be registered with Beam in order to be executed in ParDos. This is
+// done automatically by the starcgen code generator, or it can be done manually
+// by calling beam.RegisterFunction in an init() call.
+func init() {
+ beam.RegisterFunction(extractFn)
+ beam.RegisterFunction(formatFn)
+}
var (
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index a52dd43..8603185 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -123,7 +123,7 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) {
if len(edges) == 0 {
return nil, errors.New("empty graph")
}
-
+
tree := NewScopeTree(edges)
m := newMarshaller(opt)
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
index e36692a..3c76725 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
@@ -61,9 +61,10 @@ type Restriction struct {
Start, End int64
}
-// EvenSplits splits a restriction into a number of evenly sized restrictions.
-// Each split restriction is guaranteed to not be empty, and each unit from the
-// original restriction is guaranteed to be contained in one split restriction.
+// EvenSplits splits a restriction into a number of evenly sized restrictions
+// in ascending order. Each split restriction is guaranteed to not be empty, and
+// each unit from the original restriction is guaranteed to be contained in one
+// split restriction.
//
// Num should be greater than 0. Otherwise there is no way to split the
// restriction and this function will return the original restriction.
@@ -89,6 +90,28 @@ func (r Restriction) EvenSplits(num int64) (splits []Restriction) {
return splits
}
+// SizedSplits splits a restriction into multiple restrictions of the given
+// size, in ascending order. If the restriction cannot be evenly split, the
+// final restriction will be the remainder.
+//
+// Example: (0, 24) split into size 10s -> {(0, 10), (10, 20), (20, 24)}
+//
+// Size should be greater than 0. Otherwise there is no way to split the
+// restriction and this function will return the original restriction.
+func (r Restriction) SizedSplits(size int64) (splits []Restriction) {
+ if size < 1 {
+ // Don't split, just return original restriction.
+ return append(splits, r)
+ }
+
+ s := r.Start
+ for e := s + size; e < r.End; s, e = e, e+size {
+ splits = append(splits, Restriction{Start: s, End: e})
+ }
+ splits = append(splits, Restriction{Start: s, End: r.End})
+ return splits
+}
+
// Size returns the restriction's size as the difference between Start and End.
func (r Restriction) Size() float64 {
return float64(r.End - r.Start)
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
index 489e251..ba7da64 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
@@ -63,10 +63,99 @@ func TestRestriction_EvenSplits(t *testing.T) {
t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v",
split.Start, split.End, size, min, min+1)
}
- // Check: All elements are still in a split restrictions. This
- // logic assumes that the splits are returned in order which
- // isn't guaranteed by EvenSplits, but this check is way easier
- // with the assumption.
+ // Check: All elements are still in a split restriction and
+ // the restrictions are in the appropriate ascending order.
+ if split.Start != prevEnd {
+ t.Errorf("restriction range [%v, %v] missing after splits.",
+ prevEnd, split.Start)
+ } else {
+ prevEnd = split.End
+ }
+ }
+ if prevEnd != r.End {
+ t.Errorf("restriction range [%v, %v] missing after splits.",
+ prevEnd, r.End)
+ }
+ })
+ }
+}
+
+// TestRestriction_SizedSplits tests various splits and checks that they all
+// follow the contract for SizedSplits. This means that all restrictions match
+// the given size unless it is a remainder, and that each element is present
+// in the split restrictions.
+func TestRestriction_SizedSplits(t *testing.T) {
+ tests := []struct {
+ name string
+ rest Restriction
+ size int64
+ want []Restriction
+ }{
+ {
+ name: "Remainder",
+ rest: Restriction{Start: 0, End: 11},
+ size: 5,
+ want: []Restriction{{0, 5}, {5, 10}, {10, 11}},
+ },
+ {
+ name: "OffsetRemainder",
+ rest: Restriction{Start: 11, End: 22},
+ size: 5,
+ want: []Restriction{{11, 16}, {16, 21}, {21, 22}},
+ },
+ {
+ name: "OffsetExact",
+ rest: Restriction{Start: 11, End: 21},
+ size: 5,
+ want: []Restriction{{11, 16}, {16, 21}},
+ },
+ {
+ name: "LargeValues",
+ rest: Restriction{Start: 0, End: 1024 * 1024 * 1024},
+ size: 400 * 1024 * 1024,
+ want: []Restriction{
+ {0, 400 * 1024 * 1024},
+ {400 * 1024 * 1024, 800 * 1024 * 1024},
+ {800 * 1024 * 1024, 1024 * 1024 * 1024},
+ },
+ },
+ {
+ name: "OverlyLargeSize",
+ rest: Restriction{Start: 0, End: 5},
+ size: 10,
+ want: []Restriction{{0, 5}},
+ },
+ {
+ name: "InvalidSize",
+ rest: Restriction{Start: 0, End: 21},
+ size: 0,
+ want: []Restriction{{0, 21}},
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(fmt.Sprintf("%v (rest[%v, %v], size = %v)",
+ test.name, test.rest.Start, test.rest.End, test.size), func(t *testing.T) {
+ r := test.rest
+
+ // Get the minimum size that a split restriction can be. Max size
+ // should be min + 1. This way we can check the size of each split.
+ splits := r.SizedSplits(test.size)
+ prevEnd := r.Start
+ for i, split := range splits {
+ size := split.End - split.Start
+ // Check: Each restriction has at least 1 element.
+ if size == 0 {
+ t.Errorf("split restriction [%v, %v] is empty, size must be greater than 0.",
+ split.Start, split.End)
+ }
+ // Check: Restrictions (except for the last one) must match the split size.
+ if i != len(splits)-1 && size != test.size {
+ t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v",
+ split.Start, split.End, size, test.size)
+ }
+ // Check: All elements are still in a split restriction and
+ // the restrictions are in the appropriate ascending order.
if split.Start != prevEnd {
t.Errorf("restriction range [%v, %v] missing after splits.",
prevEnd, split.Start)
diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go
new file mode 100644
index 0000000..bfccca9
--- /dev/null
+++ b/sdks/go/pkg/beam/io/textio/sdf.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package textio
+
+import (
+ "bufio"
+ "context"
+ "io"
+ "reflect"
+ "strings"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
+ "github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
+ "github.com/apache/beam/sdks/go/pkg/beam/log"
+)
+
+func init() {
+ beam.RegisterType(reflect.TypeOf((*readSdfFn)(nil)).Elem())
+ beam.RegisterFunction(sizeFn)
+}
+
+// ReadSdf is a variation of Read implemented via SplittableDoFn. This should
+// result in increased performance with runners that support splitting.
+func ReadSdf(s beam.Scope, glob string) beam.PCollection {
+ s = s.Scope("textio.ReadSdf")
+
+ filesystem.ValidateScheme(glob)
+ return readSdf(s, beam.Create(s, glob))
+}
+
+// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This
+// should result in increased performance with runners that support splitting.
+func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
+ s = s.Scope("textio.ReadAllSdf")
+
+ return readSdf(s, col)
+}
+
+// readSdf takes a PCollection of globs and returns a PCollection of lines from
+// all files in those globs. Unlike textio.read, this version uses an SDF for
+// reading files.
+func readSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
+ files := beam.ParDo(s, expandFn, col)
+ sized := beam.ParDo(s, sizeFn, files)
+ return beam.ParDo(s, &readSdfFn{}, sized)
+}
+
+// sizeFn pairs a filename with the size of that file in bytes.
+// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and
+// error return values, this can be done in readSdfFn.CreateInitialRestriction.
+func sizeFn(ctx context.Context, filename string) (string, int64, error) {
+ fs, err := filesystem.New(ctx, filename)
+ if err != nil {
+ return "", -1, err
+ }
+ defer fs.Close()
+
+ size, err := fs.Size(ctx, filename)
+ if err != nil {
+ return "", -1, err
+ }
+ return filename, size, nil
+}
+
+// readSdfFn reads individual lines from a text file, given a filename and a
+// size in bytes for that file.
+type readSdfFn struct {
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the file, using the paired size rather than fetching the file's size.
+func (fn *readSdfFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction {
+ return offsetrange.Restriction{
+ Start: 0,
+ End: size,
+ }
+}
+
+const (
+ // blockSize is the desired size of each block for initial splits.
+ blockSize int64 = 64 * 1024 * 1024 // 64 MB
+ // tooSmall is the size limit for a block. If the last block is smaller than
+ // this, it gets merged with the previous block.
+ tooSmall = blockSize / 4
+)
+
+// SplitRestriction splits each file restriction into blocks of a predeterined
+// size, with some checks to avoid having small remainders.
+func (fn *readSdfFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction {
+ splits := rest.SizedSplits(blockSize)
+ numSplits := len(splits)
+ if numSplits > 1 {
+ last := splits[numSplits-1]
+ if last.End-last.Start <= tooSmall {
+ // Last restriction is too small, so merge it with previous one.
+ splits[numSplits-2].End = last.End
+ splits = splits[:numSplits-1]
+ }
+ }
+ return splits
+}
+
+// Size returns the size of each restriction as its range.
+func (fn *readSdfFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 {
+ return rest.Size()
+}
+
+// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for
+// each restriction.
+func (fn *readSdfFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+ return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+// ProcessElement outputs all lines in the file that begin within the paired
+// restriction.
+//
+// Note that restrictions may not align perfectly with lines. So lines can begin
+// before the restriction and end within it (those are ignored), and lines can
+// begin within the restriction and past the restriction (those are entirely
+// output, including the portion outside the restriction). In some cases a
+// valid restriction might not output any lines.
+func (fn *readSdfFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error {
+ log.Infof(ctx, "Reading from %v", filename)
+
+ fs, err := filesystem.New(ctx, filename)
+ if err != nil {
+ return err
+ }
+ defer fs.Close()
+
+ fd, err := fs.OpenRead(ctx, filename)
+ if err != nil {
+ return err
+ }
+ defer fd.Close()
+
+ rd := bufio.NewReader(fd)
+
+ i := rt.GetRestriction().(offsetrange.Restriction).Start
+ if i > 0 {
+ // If restriction's starts after 0, we cannot assume a new line starts
+ // at the beginning of the restriction, so we must search for the first
+ // line beginning at or after restriction.Start. This is done by
+ // scanning to the byte just before the restriction and then reading
+ // until the next newline, leaving the reader at the start of a new
+ // line past restriction.Start.
+ i -= 1
+ n, err := rd.Discard(int(i)) // Scan to just before restriction.
+ if err == io.EOF {
+ return errors.Errorf("TextIO restriction lies outside the file being read. "+
+ "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n)
+ }
+ if err != nil {
+ return err
+ }
+ line, err := rd.ReadString('\n') // Read until the first line within the restriction.
+ if err == io.EOF {
+ // No lines start in the restriction but it's still valid, so
+ // finish claiming before returning to avoid errors.
+ rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End)
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ i += int64(len(line))
+ }
+
+ // Claim each line until we claim a line outside the restriction.
+ for rt.TryClaim(i) {
+ line, err := rd.ReadString('\n')
+ if err == io.EOF {
+ if len(line) != 0 {
+ emit(strings.TrimSuffix(line, "\n"))
+ }
+ // Finish claiming restriction before breaking to avoid errors.
+ rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End)
+ break
+ }
+ if err != nil {
+ return err
+ }
+ emit(strings.TrimSuffix(line, "\n"))
+ i += int64(len(line))
+ }
+ return nil
+}
diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go
new file mode 100644
index 0000000..05a26fa
--- /dev/null
+++ b/sdks/go/pkg/beam/io/textio/sdf_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package textio
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ _ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
+ "github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+)
+
+// TestReadSdf tests that readSdf successfully reads a test text file, and
+// outputs the correct number of lines for it, even for an exceedingly long
+// line.
+func TestReadSdf(t *testing.T) {
+ f := "../../../../data/textio_test.txt"
+ p, s := beam.NewPipelineWithRoot()
+ lines := ReadSdf(s, f)
+ passert.Count(s, lines, "NumLines", 1)
+
+ if err := beam.Run(context.Background(), "direct", p); err != nil {
+ t.Fatalf("Failed to execute job: %v", err)
+ }
+}
diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go
index 67a006c..e8bc7ed 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -104,7 +104,7 @@ func readFn(ctx context.Context, filename string, emit func(string)) error {
break
}
if err != nil {
- return (err)
+ return err
}
emit(strings.TrimSuffix(line, "\n"))
}