You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/02 23:33:13 UTC

[beam] 01/01: Return empty splits if unable to split, not errors

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam24505emptysplit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7d058b4ede71dbafa9c47e0efbd72ed2e751122b
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Dec 2 15:32:50 2022 -0800

    Return empty splits if unable to split, not errors
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  6 ++--
 .../pkg/beam/core/runtime/exec/datasource_test.go  | 32 +++++++++++-----------
 .../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 20 +++++++-------
 sdks/go/pkg/beam/core/runtime/exec/plan.go         | 13 +++++----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   | 13 ++++++++-
 5 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 9fa8df7500a..55b0b0a5cad 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -453,7 +454,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
 // sent to this DataSource, and is used to be able to perform accurate splits
 // even if the DataSource has not yet received all its elements. A bufSize of
 // 0 or less indicates that its unknown, and so uses the current known size.
-func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitResult, error) {
+func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
 	if n == nil {
 		return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
@@ -498,7 +499,8 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
 	}
 	s, fr, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil)
 	if err != nil {
-		return SplitResult{}, err
+		log.Infof(ctx, "Unsuccessful split: %v", err)
+		return SplitResult{Unsuccessful: true}, nil
 	}
 
 	// No fraction returned, perform channel split.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 46c00e68ee8..75a988fd31e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -303,7 +303,7 @@ func TestDataSource_Split(t *testing.T) {
 			runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
 
 			// SDK never splits on 0, so check that every test.
-			splitRes, err := p.Split(SplitPoints{Splits: []int64{0, test.splitIdx}})
+			splitRes, err := p.Split(ctx, SplitPoints{Splits: []int64{0, test.splitIdx}})
 			if err != nil {
 				t.Fatalf("error in Split: %v", err)
 			}
@@ -373,7 +373,7 @@ func TestDataSource_Split(t *testing.T) {
 					<-blockedCh
 					// Validate that we do not split on the element we're blocking on index.
 					// The first valid split is at test.splitIdx.
-					if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
+					if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
 						t.Errorf("error in Split: %v", err)
 					} else {
 						if got, want := splitRes.RI, test.splitIdx; got != want {
@@ -439,7 +439,7 @@ func TestDataSource_Split(t *testing.T) {
 
 		// SDK never splits on 0, so check that every test.
 		sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, BufSize: test.bufSize}
-		splitRes, err := p.Split(sp)
+		splitRes, err := p.Split(ctx, sp)
 		if err != nil {
 			t.Fatalf("error in Split: %v", err)
 		}
@@ -505,7 +505,7 @@ func TestDataSource_Split(t *testing.T) {
 					<-blockedCh
 					// Validate that we either do or do not perform a sub-element split with the
 					// given fraction.
-					if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
+					if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
 						t.Errorf("error in Split: %v", err)
 					} else {
 						// For sub-element splits, check sub-element split only results.
@@ -566,8 +566,8 @@ func TestDataSource_Split(t *testing.T) {
 		dc := DataContext{Data: &TestDataManager{R: pr}}
 		ctx := context.Background()
 
-		if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
-			t.Fatal("plan uninitialized, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(before active) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		for i, root := range p.units {
 			if err := root.Up(ctx); err != nil {
@@ -575,30 +575,30 @@ func TestDataSource_Split(t *testing.T) {
 			}
 		}
 		p.status = Active
-		if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
-			t.Fatal("plan not started, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active, not started) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan started, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "Process", Root.Process)
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan in progress, expected error when unable to get a desired split, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle)
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan finished, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(finished) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		validateSource(t, out, source, makeValues(elements...))
 	})
 
 	t.Run("sanity_errors", func(t *testing.T) {
 		var source *DataSource
-		if _, err := source.Split([]int64{0}, -1, 0); err == nil {
+		if _, err := source.Split(context.Background(), []int64{0}, -1, 0); err == nil {
 			t.Fatal("expected error splitting nil *DataSource")
 		}
-		if _, err := source.Split(nil, -1, 0); err == nil {
+		if _, err := source.Split(context.Background(), nil, -1, 0); err == nil {
 			t.Fatal("expected error splitting nil desired splits")
 		}
 	})
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index cb7a6c48cb5..16c2c902f49 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -44,7 +44,7 @@ func TestDynamicSplit(t *testing.T) {
 		name string
 		// driver is a function determining how the processing and splitting
 		// threads are created and coordinated.
-		driver func(*Plan, DataContext, *splitTestSdf) (splitResult, error)
+		driver func(context.Context, *Plan, DataContext, *splitTestSdf) (splitResult, error)
 	}{
 		{
 			// Complete a split before beginning processing.
@@ -81,7 +81,7 @@ func TestDynamicSplit(t *testing.T) {
 			dc := DataContext{Data: &TestDataManager{R: pr}}
 
 			// Call driver to coordinate processing & splitting threads.
-			splitRes, procRes := test.driver(plan, dc, sdf)
+			splitRes, procRes := test.driver(context.Background(), plan, dc, sdf)
 
 			// Validate we get a valid split result, aside from split elements.
 			if splitRes.err != nil {
@@ -141,7 +141,7 @@ func TestDynamicSplit(t *testing.T) {
 
 // nonBlockingDriver performs a split before starting processing, so no thread
 // is forced to wait on a mutex.
-func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func nonBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -149,7 +149,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
 
 	// Complete a split before unblocking processing.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 	<-rt.blockSplit
 	splitRes = <-splitResCh
@@ -166,7 +166,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
 
 // splitBlockingDriver blocks on a split request so that the SDF attempts to
 // claim while the split is occurring.
-func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func splitBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -174,7 +174,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 	// Start a split, but block on it so it holds the mutex.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 
 	// Start processing and start a claim, that'll be waiting for the mutex.
@@ -195,7 +195,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 // claimBlockingDriver blocks on a claim request so that the SDF attempts to
 // split while the claim is occurring.
-func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func claimBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -207,7 +207,7 @@ func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 	// Start a split that'll be waiting for the mutex.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 
 	// Unblock the claim, freeing the mutex (but not finishing processing yet).
@@ -333,8 +333,8 @@ type splitResult struct {
 
 // splitPlan is meant to be the goroutine representing the thread handling a
 // split request for the SDF.
-func splitPlan(plan *Plan, result chan splitResult) {
-	split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1})
+func splitPlan(ctx context.Context, plan *Plan, result chan splitResult) {
+	split, err := plan.Split(ctx, SplitPoints{Frac: 0.5, BufSize: 1})
 	result <- splitResult{split: split, err: err}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index f1a6f998e5b..0189de51c7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -251,6 +251,8 @@ type SplitPoints struct {
 
 // SplitResult contains the result of performing a split on a Plan.
 type SplitResult struct {
+	Unsuccessful bool // Indicates the split was unsuccessful.
+
 	// Indices are always included, for both channel and sub-element splits.
 	PI int64 // Primary index, last element of the primary.
 	RI int64 // Residual index, first element of the residual.
@@ -268,13 +270,14 @@ type SplitResult struct {
 // Split takes a set of potential split indexes, and if successful returns
 // the split result.
 // Returns an error when unable to split.
-func (p *Plan) Split(s SplitPoints) (SplitResult, error) {
+func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) {
+	// Can't split inactive plans.
+	if p.status != Active {
+		return SplitResult{Unsuccessful: true}, nil
+	}
 	// TODO: When bundles with multiple sources, are supported, perform splits
 	// on all sources.
-	if p.source != nil {
-		return p.source.Split(s.Splits, s.Frac, s.BufSize)
-	}
-	return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s)
+	return p.source.Split(ctx, s.Splits, s.Frac, s.BufSize)
 }
 
 // Checkpoint attempts to split an SDF if the DoFn self-checkpointed.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 95c1729166b..1e4bdc30806 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -573,7 +573,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		if ds == nil {
 			return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
 		}
-		sr, err := plan.Split(exec.SplitPoints{
+		sr, err := plan.Split(ctx, exec.SplitPoints{
 			Splits:  ds.GetAllowedSplitPoints(),
 			Frac:    ds.GetFractionOfRemainder(),
 			BufSize: ds.GetEstimatedInputElements(),
@@ -583,6 +583,17 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			return fail(ctx, instID, "unable to split %v: %v", ref, err)
 		}
 
+		// Unsuccessful splits without errors indicate we should return an empty response,
+		// as processing can confinue.
+		if sr.Unsuccessful {
+			return &fnpb.InstructionResponse{
+				InstructionId: string(instID),
+				Response: &fnpb.InstructionResponse_ProcessBundleSplit{
+					ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
+				},
+			}
+		}
+
 		var pRoots []*fnpb.BundleApplication
 		var rRoots []*fnpb.DelayedBundleApplication
 		if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {