You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/22 17:27:26 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

lostluck commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r429361145



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })

Review comment:
       Consider https://golang.org/pkg/sort/#Ints

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {

Review comment:
       Also consider https://golang.org/pkg/sort/#Search  or even https://golang.org/pkg/sort/#SearchInts
   
   Though feel free to benchmark what you've got with an implementation that uses the sort package implementations.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {
+			if s >= safeStart && s <= splitIdx {
+				diff := intAbs(fracIdx - s)
+				if diff <= prevDiff {
+					prevDiff = diff
+					bestS = s
+				} else {
+					break // Stop early if the difference starts increasing.
+				}
+			}
+		}
+		if bestS != -1 {
+			return bestS, nil
+		}
+	}
+	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, currIdx)

Review comment:
       Since the splits could be unbounded in size and get truncated by logging, consider changing the order to have the current index, and the current split point, and then the recommended splits.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {

Review comment:
       The split request in harness.go needs to be passing in the  "estimated  input elements" field from the proto that needs to be taken into account.
   https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L386
   
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L313
   
   The problem with just relying on the current split index is that it starts at INT64Max, which is generally going to be too high from all the elements, and runners will likely be confused. You might have seen this field in the python code passed around as  "total_buffer_size" or similar.
   
   That said, other than that, this code looks like it's behaving correctly, since the "remainder" is would be the min of estimated_input_elements and Split.
   It does mean that plan.Split needs an API change to accept the exec.SplitPoints struct instead of the individual values. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org