You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/12/08 03:12:19 UTC
[beam] branch master updated: Return empty splits if unable to split, not errors (#24508)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 80980b8be48 Return empty splits if unable to split, not errors (#24508)
80980b8be48 is described below
commit 80980b8be48ece9c6d61dc28f429374b8e7a0e4b
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Wed Dec 7 19:12:12 2022 -0800
Return empty splits if unable to split, not errors (#24508)
Co-authored-by: lostluck <13...@users.noreply.github.com>
---
sdks/go/pkg/beam/core/runtime/exec/datasource.go | 6 ++--
.../pkg/beam/core/runtime/exec/datasource_test.go | 32 +++++++++++-----------
.../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 20 +++++++-------
sdks/go/pkg/beam/core/runtime/exec/plan.go | 13 +++++----
sdks/go/pkg/beam/core/runtime/harness/harness.go | 13 ++++++++-
5 files changed, 50 insertions(+), 34 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 9fa8df7500a..55b0b0a5cad 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)
// DataSource is a Root execution unit.
@@ -453,7 +454,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
// sent to this DataSource, and is used to be able to perform accurate splits
// even if the DataSource has not yet received all its elements. A bufSize of
// 0 or less indicates that its unknown, and so uses the current known size.
-func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitResult, error) {
+func (n *DataSource) Split(ctx context.Context, splits []int64, frac float64, bufSize int64) (SplitResult, error) {
if n == nil {
return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
}
@@ -498,7 +499,8 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (SplitRe
}
s, fr, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil)
if err != nil {
- return SplitResult{}, err
+ log.Infof(ctx, "Unsuccessful split: %v", err)
+ return SplitResult{Unsuccessful: true}, nil
}
// No fraction returned, perform channel split.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 970dbb95b4d..19f639b31d0 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -303,7 +303,7 @@ func TestDataSource_Split(t *testing.T) {
runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
// SDK never splits on 0, so check that every test.
- splitRes, err := p.Split(SplitPoints{Splits: []int64{0, test.splitIdx}})
+ splitRes, err := p.Split(ctx, SplitPoints{Splits: []int64{0, test.splitIdx}})
if err != nil {
t.Fatalf("error in Split: %v", err)
}
@@ -373,7 +373,7 @@ func TestDataSource_Split(t *testing.T) {
<-blockedCh
// Validate that we do not split on the element we're blocking on index.
// The first valid split is at test.splitIdx.
- if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
+ if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
t.Errorf("error in Split: %v", err)
} else {
if got, want := splitRes.RI, test.splitIdx; got != want {
@@ -439,7 +439,7 @@ func TestDataSource_Split(t *testing.T) {
// SDK never splits on 0, so check that every test.
sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, BufSize: test.bufSize}
- splitRes, err := p.Split(sp)
+ splitRes, err := p.Split(ctx, sp)
if err != nil {
t.Fatalf("error in Split: %v", err)
}
@@ -505,7 +505,7 @@ func TestDataSource_Split(t *testing.T) {
<-blockedCh
// Validate that we either do or do not perform a sub-element split with the
// given fraction.
- if splitRes, err := source.Split([]int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
+ if splitRes, err := source.Split(context.Background(), []int64{0, 1, 2, 3, 4, 5}, test.fraction, int64(len(elements))); err != nil {
t.Errorf("error in Split: %v", err)
} else {
// For sub-element splits, check sub-element split only results.
@@ -566,8 +566,8 @@ func TestDataSource_Split(t *testing.T) {
dc := DataContext{Data: &TestDataManager{R: pr}}
ctx := context.Background()
- if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
- t.Fatal("plan uninitialized, expected error when splitting, got nil")
+ if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+ t.Fatalf("p.Split(before active) = %v,%v want unsuccessful split & nil err", sr, err)
}
for i, root := range p.units {
if err := root.Up(ctx); err != nil {
@@ -575,30 +575,30 @@ func TestDataSource_Split(t *testing.T) {
}
}
p.status = Active
- if _, err := p.Split(SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err == nil {
- t.Fatal("plan not started, expected error when splitting, got nil")
+ if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0, 3}, Frac: -1}); err != nil || !sr.Unsuccessful {
+ t.Fatalf("p.Split(active, not started) = %v,%v want unsuccessful split & nil err", sr, err)
}
runOnRoots(ctx, t, p, "StartBundle", func(root Root, ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
- if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
- t.Fatal("plan started, expected error when splitting, got nil")
+ if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+ t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err)
}
runOnRoots(ctx, t, p, "Process", Root.Process)
- if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
- t.Fatal("plan in progress, expected error when unable to get a desired split, got nil")
+ if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+ t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err)
}
runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle)
- if _, err := p.Split(SplitPoints{Splits: []int64{0}, Frac: -1}); err == nil {
- t.Fatal("plan finished, expected error when splitting, got nil")
+ if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful {
+ t.Fatalf("p.Split(finished) = %v,%v want unsuccessful split & nil err", sr, err)
}
validateSource(t, out, source, makeValues(elements...))
})
t.Run("sanity_errors", func(t *testing.T) {
var source *DataSource
- if _, err := source.Split([]int64{0}, -1, 0); err == nil {
+ if _, err := source.Split(context.Background(), []int64{0}, -1, 0); err == nil {
t.Fatal("expected error splitting nil *DataSource")
}
- if _, err := source.Split(nil, -1, 0); err == nil {
+ if _, err := source.Split(context.Background(), nil, -1, 0); err == nil {
t.Fatal("expected error splitting nil desired splits")
}
})
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index 8318b96b301..1fa3ae94c86 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -44,7 +44,7 @@ func TestDynamicSplit(t *testing.T) {
name string
// driver is a function determining how the processing and splitting
// threads are created and coordinated.
- driver func(*Plan, DataContext, *splitTestSdf) (splitResult, error)
+ driver func(context.Context, *Plan, DataContext, *splitTestSdf) (splitResult, error)
}{
{
// Complete a split before beginning processing.
@@ -81,7 +81,7 @@ func TestDynamicSplit(t *testing.T) {
dc := DataContext{Data: &TestDataManager{R: pr}}
// Call driver to coordinate processing & splitting threads.
- splitRes, procRes := test.driver(plan, dc, sdf)
+ splitRes, procRes := test.driver(context.Background(), plan, dc, sdf)
// Validate we get a valid split result, aside from split elements.
if splitRes.err != nil {
@@ -141,7 +141,7 @@ func TestDynamicSplit(t *testing.T) {
// nonBlockingDriver performs a split before starting processing, so no thread
// is forced to wait on a mutex.
-func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func nonBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
@@ -149,7 +149,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
// Complete a split before unblocking processing.
splitResCh := make(chan splitResult)
- go splitPlan(plan, splitResCh)
+ go splitPlan(ctx, plan, splitResCh)
<-rt.split
<-rt.blockSplit
splitRes = <-splitResCh
@@ -166,7 +166,7 @@ func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes
// splitBlockingDriver blocks on a split request so that the SDF attempts to
// claim while the split is occurring.
-func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func splitBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
@@ -174,7 +174,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
// Start a split, but block on it so it holds the mutex.
splitResCh := make(chan splitResult)
- go splitPlan(plan, splitResCh)
+ go splitPlan(ctx, plan, splitResCh)
<-rt.split
// Start processing and start a claim, that'll be waiting for the mutex.
@@ -195,7 +195,7 @@ func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
// claimBlockingDriver blocks on a claim request so that the SDF attempts to
// split while the claim is occurring.
-func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
+func claimBlockingDriver(ctx context.Context, plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRes splitResult, procRes error) {
// Begin processing pipeline.
procResCh := make(chan error)
go processPlan(plan, dc, procResCh)
@@ -207,7 +207,7 @@ func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (splitRe
// Start a split that'll be waiting for the mutex.
splitResCh := make(chan splitResult)
- go splitPlan(plan, splitResCh)
+ go splitPlan(ctx, plan, splitResCh)
<-rt.split
// Unblock the claim, freeing the mutex (but not finishing processing yet).
@@ -333,8 +333,8 @@ type splitResult struct {
// splitPlan is meant to be the goroutine representing the thread handling a
// split request for the SDF.
-func splitPlan(plan *Plan, result chan splitResult) {
- split, err := plan.Split(SplitPoints{Frac: 0.5, BufSize: 1})
+func splitPlan(ctx context.Context, plan *Plan, result chan splitResult) {
+ split, err := plan.Split(ctx, SplitPoints{Frac: 0.5, BufSize: 1})
result <- splitResult{split: split, err: err}
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index f1a6f998e5b..0189de51c7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -251,6 +251,8 @@ type SplitPoints struct {
// SplitResult contains the result of performing a split on a Plan.
type SplitResult struct {
+ Unsuccessful bool // Indicates the split was unsuccessful.
+
// Indices are always included, for both channel and sub-element splits.
PI int64 // Primary index, last element of the primary.
RI int64 // Residual index, first element of the residual.
@@ -268,13 +270,14 @@ type SplitResult struct {
// Split takes a set of potential split indexes, and if successful returns
// the split result.
// Returns an error when unable to split.
-func (p *Plan) Split(s SplitPoints) (SplitResult, error) {
+func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) {
+ // Can't split inactive plans.
+ if p.status != Active {
+ return SplitResult{Unsuccessful: true}, nil
+ }
// TODO: When bundles with multiple sources, are supported, perform splits
// on all sources.
- if p.source != nil {
- return p.source.Split(s.Splits, s.Frac, s.BufSize)
- }
- return SplitResult{}, fmt.Errorf("failed to split at requested splits: {%v}, Source not initialized", s)
+ return p.source.Split(ctx, s.Splits, s.Frac, s.BufSize)
}
// Checkpoint attempts to split an SDF if the DoFn self-checkpointed.
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 710566d115a..6b2386cc2e3 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -573,7 +573,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
if ds == nil {
return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
}
- sr, err := plan.Split(exec.SplitPoints{
+ sr, err := plan.Split(ctx, exec.SplitPoints{
Splits: ds.GetAllowedSplitPoints(),
Frac: ds.GetFractionOfRemainder(),
BufSize: ds.GetEstimatedInputElements(),
@@ -583,6 +583,17 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
return fail(ctx, instID, "unable to split %v: %v", ref, err)
}
+ // Unsuccessful splits without errors indicate we should return an empty response,
+ // as processing can confinue.
+ if sr.Unsuccessful {
+ return &fnpb.InstructionResponse{
+ InstructionId: string(instID),
+ Response: &fnpb.InstructionResponse_ProcessBundleSplit{
+ ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
+ },
+ }
+ }
+
var pRoots []*fnpb.BundleApplication
var rRoots []*fnpb.DelayedBundleApplication
if sr.PS != nil && len(sr.PS) > 0 && sr.RS != nil && len(sr.RS) > 0 {