You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:23:31 UTC

[GitHub] [beam] youngoli commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r430828940



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })

Review comment:
       I don't think that's safe because splits is `[]int64` instead of `[]int` (that's why I went with `sort.Slice` instead). If I'm wrong about then I'll switch it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org