You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/01 18:20:12 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #17754: [BEAM-14511] Growable Tracker for Go SDK

lostluck commented on code in PR #17754:
URL: https://github.com/apache/beam/pull/17754#discussion_r887164550


##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,226 @@ func TestTracker_TrySplit(t *testing.T) {
 		})
 	}
 }
+
+type offsetRangeEndEstimator struct {
+	EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+	return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+	o.EstimateRangeEnd = rangeEnd
+}

Review Comment:
   Style nit: Don't have a method to set an exported field, simply set the exported field.



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +227,108 @@ func (tracker *Tracker) GetRestriction() interface{} {
 func (tracker *Tracker) IsBounded() bool {
 	return true
 }
+
+// RangeEndEstimator provides the estimated end offset of the range. Users must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+	// Estimate is called to get the end offset in TrySplit() functions.
+	//
+	// The end offset is exclusive for the range. The estimated end is not required to
+	// monotonically increase as it will only be taken into consideration when the
+	// estimated end offset is larger than the current position.
+	// Returning math.MaxInt64 as the estimate implies the largest possible position for the range
+	// is math.MaxInt64 - 1.
+	//
+	// Providing a good estimate is important for an accurate progress signal and will impact
+	// splitting decisions by the runner.
+	Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that this tracker makes
+// no assumptions about the positions of blocks within the range, so users must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+	Tracker
+	rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker creates a GrowableTracker for handling a growable offset range.
+// math.MaxInt64 is used as the end of the range to indicate infinity for an unbounded range.
+//
+// An OffsetRange is considered growable when the end offset could grow (or change)
+// during execution time (e.g. Kafka topic partition offset, appended file, ...).
+//
+// The growable range is marked as done by claiming math.MaxInt64-1.
+//
+// For bounded restrictions, this tracker works the same as offsetrange.Tracker.
+// Use that directly if you have no need of estimating the end of a bound.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) (*GrowableTracker, error) {
+	if rangeEndEstimator == nil {
+		return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. Implementing offsetrange.RangeEndEstimator may be required")
+	}
+	return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+	return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+	return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+	if x > y {
+		return x
+	}
+	return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual interface{}, err error) {
+	if tracker.stopped || tracker.IsDone() {
+		return tracker.rest, nil, nil
+	}
+
+	// If current tracking range is no longer growable, split it as a normal range.
+	if tracker.End() != math.MaxInt64 {
+		return tracker.Tracker.TrySplit(fraction)
+	}
+
+	// If current range has been done, there is no more space to split.
+	if tracker.attempted == math.MaxInt64 {
+		return nil, nil, nil
+	}
+
+	cur := max(tracker.attempted, tracker.Start()-1)
+	estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+	splitPt := cur + int64(math.Ceil(math.Max(1, float64(estimatedEnd-cur)*(fraction))))
+	if splitPt > estimatedEnd {
+		return tracker.rest, nil, nil
+	}
+	residual = Restriction{Start: splitPt, End: tracker.End()}
+	tracker.rest.End = splitPt
+	return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+	// If current tracking range is no longer growable, get its progress as a normal range.
+	if tracker.End() != math.MaxInt64 {
+		return tracker.Tracker.GetProgress()
+	}
+
+	done = float64((tracker.claimed + 1) - tracker.Start())
+	remaining = float64(tracker.End() - (tracker.claimed + 1))
+	return done, remaining

Review Comment:
   This doesn't match the behavior for Java's Growable tracker. It's not taking the estimated end into account vs the last attempted into account when it should be doing so. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java#L120
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org