You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2020/10/29 06:29:11 UTC

[beam] branch master updated: [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188)

This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce190e1  [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188)
ce190e1 is described below

commit ce190e11332469ea59b6c9acf16ee7c673ccefdd
Author: Daniel Oliveira <da...@gmail.com>
AuthorDate: Wed Oct 28 23:28:08 2020 -0700

    [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188)
    
    Pretty straightforward. Read and ReadSdf should be functionally identical, and likewise for ReadAll and ReadAllSdf. Also adds a new OffsetRange method with unit tests.
---
 sdks/go/examples/stringsplit/stringsplit.go        |   2 +-
 sdks/go/examples/wordcount/wordcount.go            |   8 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |   2 +-
 .../beam/io/rtrackers/offsetrange/offsetrange.go   |  29 ++-
 .../io/rtrackers/offsetrange/offsetrange_test.go   |  97 +++++++++-
 sdks/go/pkg/beam/io/textio/sdf.go                  | 203 +++++++++++++++++++++
 sdks/go/pkg/beam/io/textio/sdf_test.go             |  39 ++++
 sdks/go/pkg/beam/io/textio/textio.go               |   2 +-
 8 files changed, 372 insertions(+), 10 deletions(-)

diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go
index e7ea230..13a1905 100644
--- a/sdks/go/examples/stringsplit/stringsplit.go
+++ b/sdks/go/examples/stringsplit/stringsplit.go
@@ -39,11 +39,11 @@ package main
 import (
 	"context"
 	"flag"
-	"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
 	"reflect"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
 	"github.com/apache/beam/sdks/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go
index fabaa60..d41ad01 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -101,6 +101,14 @@ var (
 // output any number of elements. It operates on a PCollection of type string and
 // returns a PCollection of type string. Also, using named function transforms allows
 // for easy reuse, modular testing, and an improved monitoring experience.
+//
+// DoFns must be registered with Beam in order to be executed in ParDos. This is
+// done automatically by the starcgen code generator, or it can be done manually
+// by calling beam.RegisterFunction in an init() call.
+func init() {
+	beam.RegisterFunction(extractFn)
+	beam.RegisterFunction(formatFn)
+}
 
 var (
 	wordRE  = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index a52dd43..8603185 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -123,7 +123,7 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) {
 	if len(edges) == 0 {
 		return nil, errors.New("empty graph")
 	}
-	
+
 	tree := NewScopeTree(edges)
 
 	m := newMarshaller(opt)
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
index e36692a..3c76725 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
@@ -61,9 +61,10 @@ type Restriction struct {
 	Start, End int64
 }
 
-// EvenSplits splits a restriction into a number of evenly sized restrictions.
-// Each split restriction is guaranteed to not be empty, and each unit from the
-// original restriction is guaranteed to be contained in one split restriction.
+// EvenSplits splits a restriction into a number of evenly sized restrictions
+// in ascending order. Each split restriction is guaranteed to not be empty, and
+// each unit from the original restriction is guaranteed to be contained in one
+// split restriction.
 //
 // Num should be greater than 0. Otherwise there is no way to split the
 // restriction and this function will return the original restriction.
@@ -89,6 +90,28 @@ func (r Restriction) EvenSplits(num int64) (splits []Restriction) {
 	return splits
 }
 
+// SizedSplits splits a restriction into multiple restrictions of the given
+// size, in ascending order. If the restriction cannot be evenly split, the
+// final restriction will be the remainder.
+//
+// Example: (0, 24) split into size 10s -> {(0, 10), (10, 20), (20, 24)}
+//
+// Size should be greater than 0. Otherwise there is no way to split the
+// restriction and this function will return the original restriction.
+func (r Restriction) SizedSplits(size int64) (splits []Restriction) {
+	if size < 1 {
+		// Don't split, just return original restriction.
+		return append(splits, r)
+	}
+
+	s := r.Start
+	for e := s + size; e < r.End; s, e = e, e+size {
+		splits = append(splits, Restriction{Start: s, End: e})
+	}
+	splits = append(splits, Restriction{Start: s, End: r.End})
+	return splits
+}
+
 // Size returns the restriction's size as the difference between Start and End.
 func (r Restriction) Size() float64 {
 	return float64(r.End - r.Start)
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
index 489e251..ba7da64 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
@@ -63,10 +63,99 @@ func TestRestriction_EvenSplits(t *testing.T) {
 					t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v",
 						split.Start, split.End, size, min, min+1)
 				}
-				// Check: All elements are still in a split restrictions. This
-				// logic assumes that the splits are returned in order which
-				// isn't guaranteed by EvenSplits, but this check is way easier
-				// with the assumption.
+				// Check: All elements are still in a split restriction and
+				// the restrictions are in the appropriate ascending order.
+				if split.Start != prevEnd {
+					t.Errorf("restriction range [%v, %v] missing after splits.",
+						prevEnd, split.Start)
+				} else {
+					prevEnd = split.End
+				}
+			}
+			if prevEnd != r.End {
+				t.Errorf("restriction range [%v, %v] missing after splits.",
+					prevEnd, r.End)
+			}
+		})
+	}
+}
+
+// TestRestriction_SizedSplits tests various splits and checks that they all
+// follow the contract for SizedSplits. This means that all restrictions match
+// the given size unless it is a remainder, and that each element is present
+// in the split restrictions.
+func TestRestriction_SizedSplits(t *testing.T) {
+	tests := []struct {
+		name string
+		rest Restriction
+		size int64
+		want []Restriction
+	}{
+		{
+			name: "Remainder",
+			rest: Restriction{Start: 0, End: 11},
+			size: 5,
+			want: []Restriction{{0, 5}, {5, 10}, {10, 11}},
+		},
+		{
+			name: "OffsetRemainder",
+			rest: Restriction{Start: 11, End: 22},
+			size: 5,
+			want: []Restriction{{11, 16}, {16, 21}, {21, 22}},
+		},
+		{
+			name: "OffsetExact",
+			rest: Restriction{Start: 11, End: 21},
+			size: 5,
+			want: []Restriction{{11, 16}, {16, 21}},
+		},
+		{
+			name: "LargeValues",
+			rest: Restriction{Start: 0, End: 1024 * 1024 * 1024},
+			size: 400 * 1024 * 1024,
+			want: []Restriction{
+				{0, 400 * 1024 * 1024},
+				{400 * 1024 * 1024, 800 * 1024 * 1024},
+				{800 * 1024 * 1024, 1024 * 1024 * 1024},
+			},
+		},
+		{
+			name: "OverlyLargeSize",
+			rest: Restriction{Start: 0, End: 5},
+			size: 10,
+			want: []Restriction{{0, 5}},
+		},
+		{
+			name: "InvalidSize",
+			rest: Restriction{Start: 0, End: 21},
+			size: 0,
+			want: []Restriction{{0, 21}},
+		},
+	}
+	for _, test := range tests {
+		test := test
+		t.Run(fmt.Sprintf("%v (rest[%v, %v], size = %v)",
+			test.name, test.rest.Start, test.rest.End, test.size), func(t *testing.T) {
+			r := test.rest
+
+			// Get the minimum size that a split restriction can be. Max size
+			// should be min + 1. This way we can check the size of each split.
+			splits := r.SizedSplits(test.size)
+			prevEnd := r.Start
+			for i, split := range splits {
+				size := split.End - split.Start
+				// Check: Each restriction has at least 1 element.
+				if size == 0 {
+					t.Errorf("split restriction [%v, %v] is empty, size must be greater than 0.",
+						split.Start, split.End)
+				}
+				// Check: Restrictions (except for the last one) must match the split size.
+				if i != len(splits)-1 && size != test.size {
+					t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v",
+						split.Start, split.End, size, test.size)
+				}
+				// Check: All elements are still in a split restriction and
+				// the restrictions are in the appropriate ascending order.
 				if split.Start != prevEnd {
 					t.Errorf("restriction range [%v, %v] missing after splits.",
 						prevEnd, split.Start)
diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go
new file mode 100644
index 0000000..bfccca9
--- /dev/null
+++ b/sdks/go/pkg/beam/io/textio/sdf.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package textio
+
+import (
+	"bufio"
+	"context"
+	"io"
+	"reflect"
+	"strings"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/go/pkg/beam/log"
+)
+
+func init() {
+	beam.RegisterType(reflect.TypeOf((*readSdfFn)(nil)).Elem())
+	beam.RegisterFunction(sizeFn)
+}
+
+// ReadSdf is a variation of Read implemented via SplittableDoFn. This should
+// result in increased performance with runners that support splitting.
+func ReadSdf(s beam.Scope, glob string) beam.PCollection {
+	s = s.Scope("textio.ReadSdf")
+
+	filesystem.ValidateScheme(glob)
+	return readSdf(s, beam.Create(s, glob))
+}
+
+// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This
+// should result in increased performance with runners that support splitting.
+func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
+	s = s.Scope("textio.ReadAllSdf")
+
+	return readSdf(s, col)
+}
+
+// readSdf takes a PCollection of globs and returns a PCollection of lines from
+// all files in those globs. Unlike textio.read, this version uses an SDF for
+// reading files.
+func readSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
+	files := beam.ParDo(s, expandFn, col)
+	sized := beam.ParDo(s, sizeFn, files)
+	return beam.ParDo(s, &readSdfFn{}, sized)
+}
+
+// sizeFn pairs a filename with the size of that file in bytes.
+// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and
+// error return values, this can be done in readSdfFn.CreateInitialRestriction.
+func sizeFn(ctx context.Context, filename string) (string, int64, error) {
+	fs, err := filesystem.New(ctx, filename)
+	if err != nil {
+		return "", -1, err
+	}
+	defer fs.Close()
+
+	size, err := fs.Size(ctx, filename)
+	if err != nil {
+		return "", -1, err
+	}
+	return filename, size, nil
+}
+
+// readSdfFn reads individual lines from a text file, given a filename and a
+// size in bytes for that file.
+type readSdfFn struct {
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the file, using the paired size rather than fetching the file's size.
+func (fn *readSdfFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   size,
+	}
+}
+
+const (
+	// blockSize is the desired size of each block for initial splits.
+	blockSize int64 = 64 * 1024 * 1024 // 64 MB
+	// tooSmall is the size limit for a block. If the last block is smaller than
+	// this, it gets merged with the previous block.
+	tooSmall = blockSize / 4
+)
+
+// SplitRestriction splits each file restriction into blocks of a predeterined
+// size, with some checks to avoid having small remainders.
+func (fn *readSdfFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction {
+	splits := rest.SizedSplits(blockSize)
+	numSplits := len(splits)
+	if numSplits > 1 {
+		last := splits[numSplits-1]
+		if last.End-last.Start <= tooSmall {
+			// Last restriction is too small, so merge it with previous one.
+			splits[numSplits-2].End = last.End
+			splits = splits[:numSplits-1]
+		}
+	}
+	return splits
+}
+
+// Size returns the size of each restriction as its range.
+func (fn *readSdfFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 {
+	return rest.Size()
+}
+
+// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for
+// each restriction.
+func (fn *readSdfFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+// ProcessElement outputs all lines in the file that begin within the paired
+// restriction.
+//
+// Note that restrictions may not align perfectly with lines. So lines can begin
+// before the restriction and end within it (those are ignored), and lines can
+// begin within the restriction and past the restriction (those are entirely
+// output, including the portion outside the restriction). In some cases a
+// valid restriction might not output any lines.
+func (fn *readSdfFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error {
+	log.Infof(ctx, "Reading from %v", filename)
+
+	fs, err := filesystem.New(ctx, filename)
+	if err != nil {
+		return err
+	}
+	defer fs.Close()
+
+	fd, err := fs.OpenRead(ctx, filename)
+	if err != nil {
+		return err
+	}
+	defer fd.Close()
+
+	rd := bufio.NewReader(fd)
+
+	i := rt.GetRestriction().(offsetrange.Restriction).Start
+	if i > 0 {
+		// If restriction's starts after 0, we cannot assume a new line starts
+		// at the beginning of the restriction, so we must search for the first
+		// line beginning at or after restriction.Start. This is done by
+		// scanning to the byte just before the restriction and then reading
+		// until the next newline, leaving the reader at the start of a new
+		// line past restriction.Start.
+		i -= 1
+		n, err := rd.Discard(int(i)) // Scan to just before restriction.
+		if err == io.EOF {
+			return errors.Errorf("TextIO restriction lies outside the file being read. "+
+				"Restriction begins at %v bytes, but file is only %v bytes.", i+1, n)
+		}
+		if err != nil {
+			return err
+		}
+		line, err := rd.ReadString('\n') // Read until the first line within the restriction.
+		if err == io.EOF {
+			// No lines start in the restriction but it's still valid, so
+			// finish claiming before returning to avoid errors.
+			rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End)
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+		i += int64(len(line))
+	}
+
+	// Claim each line until we claim a line outside the restriction.
+	for rt.TryClaim(i) {
+		line, err := rd.ReadString('\n')
+		if err == io.EOF {
+			if len(line) != 0 {
+				emit(strings.TrimSuffix(line, "\n"))
+			}
+			// Finish claiming restriction before breaking to avoid errors.
+			rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End)
+			break
+		}
+		if err != nil {
+			return err
+		}
+		emit(strings.TrimSuffix(line, "\n"))
+		i += int64(len(line))
+	}
+	return nil
+}
diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go
new file mode 100644
index 0000000..05a26fa
--- /dev/null
+++ b/sdks/go/pkg/beam/io/textio/sdf_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package textio
+
+import (
+	"context"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	_ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+)
+
+// TestReadSdf tests that readSdf successfully reads a test text file, and
+// outputs the correct number of lines for it, even for an exceedingly long
+// line.
+func TestReadSdf(t *testing.T) {
+	f := "../../../../data/textio_test.txt"
+	p, s := beam.NewPipelineWithRoot()
+	lines := ReadSdf(s, f)
+	passert.Count(s, lines, "NumLines", 1)
+
+	if err := beam.Run(context.Background(), "direct", p); err != nil {
+		t.Fatalf("Failed to execute job: %v", err)
+	}
+}
diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go
index 67a006c..e8bc7ed 100644
--- a/sdks/go/pkg/beam/io/textio/textio.go
+++ b/sdks/go/pkg/beam/io/textio/textio.go
@@ -104,7 +104,7 @@ func readFn(ctx context.Context, filename string, emit func(string)) error {
 			break
 		}
 		if err != nil {
-			return (err)
+			return err
 		}
 		emit(strings.TrimSuffix(line, "\n"))
 	}