You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/19 22:37:16 UTC

[beam] branch master updated: [BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f1980dc917c [BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)
f1980dc917c is described below

commit f1980dc917c939a9c8178de62daeda1405032701
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 19 18:37:10 2022 -0400

    [BEAM-14484] Improve behavior surrounding primary roots in self-checkpointing (#17716)
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   | 43 +++++++++++++-
 .../pkg/beam/core/runtime/exec/datasource_test.go  | 69 ++++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  1 +
 sdks/go/pkg/beam/core/sdf/sdf.go                   |  4 ++
 sdks/go/pkg/beam/core/sdf/wrappedbounded.go        | 34 +++++++++++
 .../test/integration/primitives/checkpointing.go   | 16 ++++-
 6 files changed, 162 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index e47fee00c96..8c39aeaed41 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -348,6 +349,27 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) {
 	return encodeElms
 }
 
+func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, bool) {
+	tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm
+	tracker, ok := tElm.(sdf.RTracker)
+	if !ok {
+		log.Warnf(context.Background(), "expected type sdf.RTracker, got type %T", tElm)
+		return nil, -1.0, false
+	}
+	boundTracker, ok := tracker.(sdf.BoundableRTracker)
+	if !ok {
+		log.Warn(context.Background(), "expected type sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()")
+		// Assume an RTracker that does not implement IsBounded() will always be bounded, wrap so it can be used.
+		boundTracker = sdf.NewWrappedTracker(tracker)
+	}
+	size, ok := root.Elm2.(float64)
+	if !ok {
+		log.Warnf(context.Background(), "expected size to be type float64, got type %T", root.Elm2)
+		return nil, -1.0, false
+	}
+	return boundTracker, size, true
+}
+
 // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a
 // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not
 // splittable or has not returned a resuming continuation, the function returns an empty
@@ -366,13 +388,30 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
 
 	ow := su.GetOutputWatermark()
 
-	// Always split at fraction 0.0, should have no primaries left.
+	// Always split at fraction 0.0. All remaining work should be returned as a residual, as anything left in the primaries
+	// will not be rescheduled and could represent data loss. We expect nil primaries but will also ignore any restrictions
+	// that are bounded and of size 0 as they represent no remaining work.
 	ps, rs, err := su.Split(0.0)
 	if err != nil {
 		return SplitResult{}, -1 * time.Minute, false, err
 	}
+	if len(rs) == 0 {
+		return SplitResult{}, -1 * time.Minute, false, nil
+	}
 	if len(ps) != 0 {
-		return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+		// Expected structure of the root FullValue is KV<KV<Elm, KV<BoundedRTracker, watermarkEstimatorState>>, Size>
+		for _, root := range ps {
+			tracker, size, ok := getBoundedRTrackerFromRoot(root)
+			// If type assertion didn't return a BoundableRTracker, we move on.
+			if !ok {
+				log.Warnf(context.Background(), "got unexpected primary root contents %v, please check the output of the restriction tracker's TrySplit() function", root)
+				continue
+			}
+			if !tracker.IsBounded() || size > 0.00001 {
+				return SplitResult{}, -1 * time.Minute, false, fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that the restriction tracker returns nil in TrySplit() when the split fraction is 0.0", ps)
+			}
+		}
+
 	}
 
 	encodeElms := n.makeEncodeElms()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index bf367ae1cb4..365cf52062f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
 	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
@@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source *DataSource, expected
 		t.Errorf("DataSource => %#v, want %#v", extractValues(out.Elements...), extractValues(expected...))
 	}
 }
+
+func constructRootFullValue(rt, size interface{}) *FullValue {
+	return &FullValue{
+		Elm: &FullValue{
+			Elm2: &FullValue{
+				Elm: rt,
+			},
+		},
+		Elm2: size,
+	}
+}
+
+func TestGetRTrackerFromRoot(t *testing.T) {
+	var tests = []struct {
+		name    string
+		inRt    interface{}
+		inSize  interface{}
+		valid   bool
+		expSize float64
+	}{
+		{
+			"valid",
+			offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
+			1.0,
+			true,
+			1.0,
+		},
+		{
+			"not a bounded rtracker",
+			int64(42),
+			1.0,
+			false,
+			-1.0,
+		},
+		{
+			"non-float size",
+			offsetrange.NewTracker(offsetrange.Restriction{Start: int64(0), End: int64(1)}),
+			int64(1),
+			false,
+			-1.0,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			root := constructRootFullValue(test.inRt, test.inSize)
+			tracker, size, ok := getBoundedRTrackerFromRoot(root)
+
+			if test.valid {
+				if !ok {
+					t.Fatalf("failed to get tracker and size from root")
+				}
+				if tracker == nil {
+					t.Errorf("got nil tracker, expected %#v", test.inRt)
+				}
+			} else {
+				if ok {
+					t.Errorf("invalid root returned ok")
+				}
+				if tracker != nil {
+					t.Errorf("got tracker %#v, want nil", tracker)
+				}
+			}
+			if !floatEquals(test.expSize, size, 0.001) {
+				t.Errorf("got size %f, want %f", size, test.inSize)
+			}
+		})
+	}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d5c7a31b3e4..7b777771a8c 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -68,6 +68,7 @@ const (
 
 	URNRequiresSplittableDoFn     = "beam:requirement:pardo:splittable_dofn:v1"
 	URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+	URNTruncate                   = "beam:transform:sdf_truncate_sized_restrictions_v1"
 
 	// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
 	URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go
index 2876d5985a2..9812d300e5b 100644
--- a/sdks/go/pkg/beam/core/sdf/sdf.go
+++ b/sdks/go/pkg/beam/core/sdf/sdf.go
@@ -72,6 +72,10 @@ type RTracker interface {
 	// the only split point is the end of the restriction, or the split failed for some recoverable
 	// reason), then this function returns nil as the residual.
 	//
+	// If the split fraction is 0 (e.g. a self-checkpointing split) TrySplit() should return either
+	// a nil primary or an RTracker that is both bounded and has size 0. This ensures that there is
+	// no data that is lost by not being rescheduled for execution later.
+	//
 	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
 	TrySplit(fraction float64) (primary, residual interface{}, err error)
 
diff --git a/sdks/go/pkg/beam/core/sdf/wrappedbounded.go b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
new file mode 100644
index 00000000000..36f44817a83
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+// WrappedTracker wraps an implementation of an RTracker and adds an IsBounded() function
+// that returns true in order to allow RTrackers to be handled as bounded BoundableRTrackers
+// if necessary (like in self-checkpointing evaluation.)
+type WrappedTracker struct {
+	RTracker
+}
+
+// IsBounded returns true, indicating that the underlying RTracker represents a bounded
+// amount of work.
+func (t *WrappedTracker) IsBounded() bool {
+	return true
+}
+
+// NewWrappedTracker is a constructor for an RTracker that wraps another RTracker into a BoundedRTracker.
+func NewWrappedTracker(underlying RTracker) *WrappedTracker {
+	return &WrappedTracker{RTracker: underlying}
+}
diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go
index f26b9f392ac..5b1079ad4ef 100644
--- a/sdks/go/test/integration/primitives/checkpointing.go
+++ b/sdks/go/test/integration/primitives/checkpointing.go
@@ -16,12 +16,14 @@
 package primitives
 
 import (
+	"context"
 	"reflect"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 )
 
@@ -53,7 +55,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, rest offsetrange.Rest
 // SplitRestriction modifies the offsetrange.Restriction's sized restriction function to produce a size-zero restriction
 // at the end of execution.
 func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) []offsetrange.Restriction {
-	size := int64(1)
+	size := int64(10)
 	s := rest.Start
 	var splits []offsetrange.Restriction
 	for e := s + size; e <= rest.End; s, e = e, e+size {
@@ -68,19 +70,27 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest offsetrange.Res
 func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, emit func(int64)) sdf.ProcessContinuation {
 	position := rt.GetRestriction().(offsetrange.Restriction).Start
 
+	counter := 0
 	for {
 		if rt.TryClaim(position) {
 			// Successful claim, emit the value and move on.
 			emit(position)
 			position++
-			return sdf.ResumeProcessingIn(1 * time.Second)
+			counter++
 		} else if rt.GetError() != nil || rt.IsDone() {
 			// Stop processing on error or completion
+			if err := rt.GetError(); err != nil {
+				log.Errorf(context.Background(), "error in restriction tracker, got %v", err)
+			}
 			return sdf.StopProcessing()
 		} else {
-			// Failed to claim but no error, resume later.
+			// Resume later.
 			return sdf.ResumeProcessingIn(5 * time.Second)
 		}
+
+		if counter >= 10 {
+			return sdf.ResumeProcessingIn(1 * time.Second)
+		}
 	}
 }