You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/19 22:37:16 UTC
[beam] branch master updated: [BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f1980dc917c [BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)
f1980dc917c is described below
commit f1980dc917c939a9c8178de62daeda1405032701
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 19 18:37:10 2022 -0400
[BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)
---
sdks/go/pkg/beam/core/runtime/exec/datasource.go | 43 +++++++++++++-
.../pkg/beam/core/runtime/exec/datasource_test.go | 69 ++++++++++++++++++++++
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 1 +
sdks/go/pkg/beam/core/sdf/sdf.go | 4 ++
sdks/go/pkg/beam/core/sdf/wrappedbounded.go | 34 +++++++++++
.../test/integration/primitives/checkpointing.go | 16 ++++-
6 files changed, 162 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index e47fee00c96..8c39aeaed41 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)
// DataSource is a Root execution unit.
@@ -348,6 +349,27 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) {
return encodeElms
}
+func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) {
+ tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm
+ tracker, ok := tElm.(sdf.RTracker)
+ if !ok {
+ log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm)
+ return nil, -1.0, false
+ }
+ boundTracker, ok := tracker.(sdf.BoundableRTracker)
+ if !ok {
+ log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()")
+ // Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used.
+ boundTracker = sdf.NewWrappedTracker(tracker)
+ }
+ size, ok := root.Elm2.(float64)
+ if !ok {
+ log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2)
+ return nil, -1.0, false
+ }
+ return boundTracker, size, true
+}
+
// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a
// ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not
// splittable or has not returned a resuming continuation, the function returns an empty
@@ -366,13 +388,30 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
ow := su.GetOutputWatermark()
- // Always split at fraction 0.0, should have no primaries left.
+ // Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries
+ // will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions
+ // that are bounded and of size 0 as they represent no remaining work.
ps, rs, err := su.Split(0.0)
if err != nil {
return SplitResult{}, -1 * time.Minute, false, err
}
+ if len(rs) == 0 {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
if len(ps) != 0 {
- return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+ // Expected structure of the root FullValue is KV<KV<Elm, KV<BoundedRTracker, watermarkEstimatorState>>, Size>
+ for _, root := range ps {
+ tracker, size, ok := getBoundedRTrackerFromRoot(root)
+ // If type assertion didn't return a BoundableRTracker, we move on.
+ if !ok {
+ log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root)
+ continue
+ }
+ if !tracker.IsBounded() || size > 0.00001 {
+ return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps)
+ }
+ }
+
}
encodeElms := n.makeEncodeElms()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index bf367ae1cb4..365cf52062f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected
t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
}
}
+
+func constructRootFullValue(rt, size interface{}) *FullValue {
+ return &FullValue{
+ Elm: &FullValue{
+ Elm2: &FullValue{
+ Elm: rt,
+ },
+ },
+ Elm2: size,
+ }
+}
+
+func TestGetRTrackerFromRoot(t *testing.T) {
+ var tests = []struct {
+ name string
+ inRt interface{}
+ inSize interface{}
+ valid bool
+ expSize float64
+ }{
+ {
+ "valid",
+ offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
+ 1.0,
+ true,
+ 1.0,
+ },
+ {
+ "not a bounded rtracker",
+ int64(42),
+ 1.0,
+ false,
+ -1.0,
+ },
+ {
+ "non-float size",
+ offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
+ int64(1),
+ false,
+ -1.0,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ root := constructRootFullValue(test.inRt, test.inSize)
+ tracker, size, ok := getBoundedRTrackerFromRoot(root)
+
+ if test.valid {
+ if !ok {
+ t.Fatalf("failed to get tracker and size from root")
+ }
+ if tracker == nil {
+ t.Errorf("got nil tracker, expected %#v", test.inRt)
+ }
+ } else {
+ if ok {
+ t.Errorf("invalid root returned ok")
+ }
+ if tracker != nil {
+ t.Errorf("got tracker %#v, want nil", tracker)
+ }
+ }
+ if !floatEquals(test.expSize, size, 0.001) {
+ t.Errorf("got size %f, want %f", size, test.inSize)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d5c7a31b3e4..7b777771a8c 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -68,6 +68,7 @@ const (
URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"
URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+ URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1"
// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go
index 2876d5985a2..9812d300e5b 100644
--- a/sdks/go/pkg/beam/core/sdf/sdf.go
+++ b/sdks/go/pkg/beam/core/sdf/sdf.go
@@ -72,6 +72,10 @@ type RTracker interface {
// the only split point is the end of the restriction, or the split failed for some recoverable
// reason), then this function returns nil as the residual.
//
+ // If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either
+ // a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is
+ // no data that is lost by not being rescheduled for execution later.
+ //
// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
TrySplit(fraction float64) (primary, residual interface{}, err error)
diff --git a/sdks/go/pkg/beam/core/sdf/wrappedbounded.go b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
new file mode 100644
index 00000000000..36f44817a83
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+// WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function
+// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers
+// if necessary (like in self-checkpointing evaluation.)
+type WrappedTracker struct {
+ RTracker
+}
+
+// IsBounded returns true, indicating that the underlying RTracker represents a bounded
+// amount of work.
+func (t *WrappedTracker) IsBounded() bool {
+ return true
+}
+
+// NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker.
+func NewWrappedTracker(underlying RTracker) *WrappedTracker {
+ return &WrappedTracker{RTracker: underlying}
+}
diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go
index f26b9f392ac..5b1079ad4ef 100644
--- a/sdks/go/test/integration/primitives/checkpointing.go
+++ b/sdks/go/test/integration/primitives/checkpointing.go
@@ -16,12 +16,14 @@
package primitives
import (
+ "context"
"reflect"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)
@@ -53,7 +55,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Rest
// SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction
// at the end of execution.
func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction {
- size := int64(1)
+ size := int64(10)
s := rest.Start
var splits []offsetrange.Restriction
for e := s + size; e <= rest.End; s, e = e, e+size {
@@ -68,19 +70,27 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Res
func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
position := rt.GetRestriction().(offsetrange.Restriction).Start
+ counter := 0
for {
if rt.TryClaim(position) {
// Successful claim, emit the value and move on.
emit(position)
position++
- return sdf.ResumeProcessingIn(1 * time.Second)
+ counter++
} else if rt.GetError() != nil || rt.IsDone() {
// Stop processing on error or completion
+ if err := rt.GetError(); err != nil {
+ log.Errorf(context.Background(), "error in restriction tracker, got %v", err)
+ }
return sdf.StopProcessing()
} else {
- // Failed to claim but no error, resume later.
+ // Resume later.
return sdf.ResumeProcessingIn(5 * time.Second)
}
+
+ if counter >= 10 {
+ return sdf.ResumeProcessingIn(1 * time.Second)
+ }
}
}