You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/02 23:33:12 UTC

[beam] branch beam24505emptysplit created (now 7d058b4ede7)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch beam24505emptysplit
in repository https://gitbox.apache.org/repos/asf/beam.git


      at 7d058b4ede7 Return empty splits if unable to split, not errors

This branch includes the following new commits:

     new 7d058b4ede7 Return empty splits if unable to split, not errors

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Return empty splits if unable to split, not errors

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch beam24505emptysplit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7d058b4ede71dbafa9c47e0efbd72ed2e751122b
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Fri Dec 2 15:32:50 2022 -0800

    Return empty splits if unable to split, not errors
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |  6 ++--
 .../pkg/beam/core/runtime/exec/datasource_test.go  | 32 +++++++++++-----------
 .../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 20 +++++++-------
 sdks/go/pkg/beam/core/runtime/exec/plan.go         | 13 +++++----
 sdks/go/pkg/beam/core/runtime/harness/harness.go   | 13 ++++++++-
 5 files changed, 50 insertions(+), 34 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 9fa8df7500a..55b0b0a5cad 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -453,7 +454,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
 // sent to this DataSource, and is used to be able to perform accurate splits
 // even if the DataSource has not yet received all its elements. A bufSize of
 // 0 or less indicates that its unknown, and so uses the current known size.
-func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitResult, error) {
+func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
 	if n == nil {
 		return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
@@ -498,7 +499,8 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
 	}
 	s, fr, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil)
 	if err != nil {
-		return SplitResult{}, err
+		log.Infof(ctx, "Unsuccessful split: %v", err)
+		return SplitResult{Unsuccessful: true}, nil
 	}
 
 	// No fraction returned, perform channel split.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 46c00e68ee8..75a988fd31e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -303,7 +303,7 @@ func TestDataSource_Split(t *testing.T) {
 			runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
 
 			// SDK never splits on 0, so check that every test.
-			splitRes, err := p.Split(SplitPoints{Splits: []int64{0, test.splitIdx}})
+			splitRes, err := p.Split(ctx, SplitPoints{Splits: []int64{0, test.splitIdx}})
 			if err != nil {
 				t.Fatalf("error in Split: %v", err)
 			}
@@ -373,7 +373,7 @@ func TestDataSource_Split(t *testing.T) {
 					<-blockedCh
 					// Validate that we do not split on the element we're blocking on index.
 					// The first valid split is at test.splitIdx.
-					if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
+					if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
 						t.Errorf("error in Split: %v", err)
 					} else {
 						if got, want := splitRes.RI, test.splitIdx; got != want {
@@ -439,7 +439,7 @@ func TestDataSource_Split(t *testing.T) {
 
 		// SDK never splits on 0, so check that every test.
 		sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, BufSize: test.bufSize}
-		splitRes, err := p.Split(sp)
+		splitRes, err := p.Split(ctx, sp)
 		if err != nil {
 			t.Fatalf("error in Split: %v", err)
 		}
@@ -505,7 +505,7 @@ func TestDataSource_Split(t *testing.T) {
 					<-blockedCh
 					// Validate that we either do or do not perform a sub-element split with the
 					// given fraction.
-					if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
+					if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
 						t.Errorf("error in Split: %v", err)
 					} else {
 						// For sub-element splits, check sub-element split only results.
@@ -566,8 +566,8 @@ func TestDataSource_Split(t *testing.T) {
 		dc := DataContext{Data: &TestDataManager{R: pr}}
 		ctx := context.Background()
 
-		if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
-			t.Fatal("plan uninitialized, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(before active) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		for i, root := range p.units {
 			if err := root.Up(ctx); err != nil {
@@ -575,30 +575,30 @@ func TestDataSource_Split(t *testing.T) {
 			}
 		}
 		p.status = Active
-		if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
-			t.Fatal("plan not started, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active, not started) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan started, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "Process", Root.Process)
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan in progress, expected error when unable to get a desired split, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle)
-		if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
-			t.Fatal("plan finished, expected error when splitting, got nil")
+		if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+			t.Fatalf("p.Split(finished) = %v,%v want unsuccessful split & nil err", sr, err)
 		}
 		validateSource(t, out, source, makeValues(elements...))
 	})
 
 	t.Run("sanity_errors", func(t *testing.T) {
 		var source *DataSource
-		if _, err := source.Split([]int64{0}, -1, 0); err == nil {
+		if _, err := source.Split(context.Background(), []int64{0}, -1, 0); err == nil {
 			t.Fatal("expected error splitting nil *DataSource")
 		}
-		if _, err := source.Split(nil, -1, 0); err == nil {
+		if _, err := source.Split(context.Background(), nil, -1, 0); err == nil {
 			t.Fatal("expected error splitting nil desired splits")
 		}
 	})
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index cb7a6c48cb5..16c2c902f49 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -44,7 +44,7 @@ func TestDynamicSplit(t *testing.T) {
 		name string
 		// driver is a function determining how the processing and splitting
 		// threads are created and coordinated.
-		driver func(*Plan, DataContext, *splitTestSdf) (splitResult, error)
+		driver func(context.Context, *Plan, DataContext, *splitTestSdf) (splitResult, error)
 	}{
 		{
 			// Complete a split before beginning processing.
@@ -81,7 +81,7 @@ func TestDynamicSplit(t *testing.T) {
 			dc := DataContext{Data: &TestDataManager{R: pr}}
 
 			// Call driver to coordinate processing & splitting threads.
-			splitRes, procRes := test.driver(plan, dc, sdf)
+			splitRes, procRes := test.driver(context.Background(), plan, dc, sdf)
 
 			// Validate we get a valid split result, aside from split elements.
 			if splitRes.err != nil {
@@ -141,7 +141,7 @@ func TestDynamicSplit(t *testing.T) {
 
 // nonBlockingDriver performs a split before starting processing, so no thread
 // is forced to wait on a mutex.
-func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func nonBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -149,7 +149,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
 
 	// Complete a split before unblocking processing.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 	<-rt.blockSplit
 	splitRes = <-splitResCh
@@ -166,7 +166,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
 
 // splitBlockingDriver blocks on a split request so that the SDF attempts to
 // claim while the split is occurring.
-func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func splitBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -174,7 +174,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 	// Start a split, but block on it so it holds the mutex.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 
 	// Start processing and start a claim, that'll be waiting for the mutex.
@@ -195,7 +195,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 // claimBlockingDriver blocks on a claim request so that the SDF attempts to
 // split while the claim is occurring.
-func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func claimBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
 	// Begin processing pipeline.
 	procResCh := make(chan error)
 	go processPlan(plan, dc, procResCh)
@@ -207,7 +207,7 @@ func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
 
 	// Start a split that'll be waiting for the mutex.
 	splitResCh := make(chan splitResult)
-	go splitPlan(plan, splitResCh)
+	go splitPlan(ctx, plan, splitResCh)
 	<-rt.split
 
 	// Unblock the claim, freeing the mutex (but not finishing processing yet).
@@ -333,8 +333,8 @@ type splitResult struct {
 
 // splitPlan is meant to be the goroutine representing the thread handling a
 // split request for the SDF.
-func splitPlan(plan *Plan, result chan splitResult) {
-	split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1})
+func splitPlan(ctx context.Context, plan *Plan, result chan splitResult) {
+	split, err := plan.Split(ctx, SplitPoints{Frac: 0.5, BufSize: 1})
 	result <- splitResult{split: split, err: err}
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index f1a6f998e5b..0189de51c7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -251,6 +251,8 @@ type SplitPoints struct {
 
 // SplitResult contains the result of performing a split on a Plan.
 type SplitResult struct {
+	Unsuccessful bool // Indicates the split was unsuccessful.
+
 	// Indices are always included, for both channel and sub-element splits.
 	PI int64 // Primary index, last element of the primary.
 	RI int64 // Residual index, first element of the residual.
@@ -268,13 +270,14 @@ type SplitResult struct {
 // Split takes a set of potential split indexes, and if successful returns
 // the split result.
 // Returns an error when unable to split.
-func (p *Plan) Split(s SplitPoints) (SplitResult, error) {
+func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) {
+	// Can't split inactive plans.
+	if p.status != Active {
+		return SplitResult{Unsuccessful: true}, nil
+	}
 	// TODO: When bundles with multiple sources, are supported, perform splits
 	// on all sources.
-	if p.source != nil {
-		return p.source.Split(s.Splits, s.Frac, s.BufSize)
-	}
-	return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s)
+	return p.source.Split(ctx, s.Splits, s.Frac, s.BufSize)
 }
 
 // Checkpoint attempts to split an SDF if the DoFn self-checkpointed.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 95c1729166b..1e4bdc30806 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -573,7 +573,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 		if ds == nil {
 			return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
 		}
-		sr, err := plan.Split(exec.SplitPoints{
+		sr, err := plan.Split(ctx, exec.SplitPoints{
 			Splits:  ds.GetAllowedSplitPoints(),
 			Frac:    ds.GetFractionOfRemainder(),
 			BufSize: ds.GetEstimatedInputElements(),
@@ -583,6 +583,17 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 			return fail(ctx, instID, "unable to split %v: %v", ref, err)
 		}
 
+		// Unsuccessful splits without errors indicate we should return an empty response,
+		// as processing can confinue.
+		if sr.Unsuccessful {
+			return &fnpb.InstructionResponse{
+				InstructionId: string(instID),
+				Response: &fnpb.InstructionResponse_ProcessBundleSplit{
+					ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
+				},
+			}
+		}
+
 		var pRoots []*fnpb.BundleApplication
 		var rRoots []*fnpb.DelayedBundleApplication
 		if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {