You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/22 02:58:22 UTC

[GitHub] [beam] youngoli opened a new pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

youngoli opened a new pull request #11791:
URL: https://github.com/apache/beam/pull/11791


   Adds code to more closely align with the implementations of splitting in
   Python and Java. Note that not all cases are implemented. There is no
   measurement of sub-element progress yet, nor is there sub-element (SDF)
   splitting yet.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11791:
URL: https://github.com/apache/beam/pull/11791#issuecomment-632451757


   For reference, the tests I'm trying to match: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor_test.py#L61


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r429361145



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })

Review comment:
       Consider https://golang.org/pkg/sort/#Ints

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {

Review comment:
       Also consider https://golang.org/pkg/sort/#Search  or even https://golang.org/pkg/sort/#SearchInts
   
   Though feel free to benchmark what you've got with an implementation that uses the sort package implementations.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {
+			if s >= safeStart && s <= splitIdx {
+				diff := intAbs(fracIdx - s)
+				if diff <= prevDiff {
+					prevDiff = diff
+					bestS = s
+				} else {
+					break // Stop early if the difference starts increasing.
+				}
+			}
+		}
+		if bestS != -1 {
+			return bestS, nil
+		}
+	}
+	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, currIdx)

Review comment:
       Since the splits could be unbounded in size and get truncated by logging, consider changing the order to have the current index, and the current split point, and then the recommended splits.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {

Review comment:
       The split request in harness.go needs to be passing in the  "estimated  input elements" field from the proto that needs to be taken into account.
   https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L386
   
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L313
   
   The problem with just relying on the current split index is that it starts at INT64Max, which is generally going to be too high from all the elements, and runners will likely be confused. You might have seen this field in the python code passed around as  "total_buffer_size" or similar.
   
   That said, other than that, this code looks like it's behaving correctly, since the "remainder" is would be the min of estimated_input_elements and Split.
   It does mean that plan.Split needs an API change to accept the exec.SplitPoints struct instead of the individual values. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431615426



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {
+			if s >= safeStart && s <= splitIdx {
+				diff := intAbs(fracIdx - s)
+				if diff <= prevDiff {
+					prevDiff = diff
+					bestS = s
+				} else {
+					break // Stop early if the difference starts increasing.
+				}
+			}
+		}
+		if bestS != -1 {
+			return bestS, nil
+		}
+	}
+	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, currIdx)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11791:
URL: https://github.com/apache/beam/pull/11791#issuecomment-632451211


   R: @lostluck 
   CC: @robertwb @lukecwik @boyuanzz 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431615906



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {

Review comment:
       Done. I missed that in the original. Added that behavior and a test for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431277688



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })

Review comment:
       Whoops. Good point. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11791:
URL: https://github.com/apache/beam/pull/11791#issuecomment-635615190


   Run Go PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r430828940



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })

Review comment:
       I don't think that's safe because splits is `[]int64` instead of `[]int` (that's why I went with `sort.Slice` instead). If I'm wrong about then I'll switch it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #11791:
URL: https://github.com/apache/beam/pull/11791


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431278374



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {

Review comment:
       Per your comment above, I made a type mistake, and the suggestion is invalid. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11791: [BEAM-9935] Respect allowed split points and fraction in Go.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431615308



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
 	return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-	if splits == nil {
-		return 0, fmt.Errorf("failed to split: requested splits were empty")
-	}
 	if n == nil {
 		return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource not initialized", splits)
 	}
+	if frac > 1.0 {
+		frac = 1.0
+	} else if frac < 0.0 {
+		frac = 0.0
+	}
+
 	n.mu.Lock()
-	c := n.index
-	// Find the smallest split index that we haven't yet processed, and set
-	// the promised split index to this value.
-	for _, s := range splits {
-		// // Never split on the first element, or the current element.
-		if s > 0 && s > c && s <= n.splitIdx {
-			n.splitIdx = s
-			fs := n.splitIdx
-			n.mu.Unlock()
-			return fs, nil
-		}
+	s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+	if err != nil {
+		n.mu.Unlock()
+		return 0, err
 	}
+	n.splitIdx = s
+	fs := n.splitIdx
 	n.mu.Unlock()
-	// If we can't find a suitable split index from the requested choices,
-	// return an error.
-	return 0, fmt.Errorf("failed to split at requested splits: {%v}, DataSource at index: %v", splits, c)
+	return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) (int64, error) {
+	// Get split index from fraction. Find the closest index to the fraction of
+	// the remainder.
+	var start int64 = 0
+	if currIdx > start {
+		start = currIdx
+	}
+	// This is the first valid split index, since we should never split at 0 or
+	// at the current element.
+	safeStart := start + 1
+	// The remainder starts at our actual progress (i.e. start), but our final
+	// split index has to be >= our safeStart.
+	fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+	if fracIdx < safeStart {
+		fracIdx = safeStart
+	}
+	if splits == nil {
+		// All split points are valid so just split at fraction.
+		return fracIdx, nil
+	} else {
+		// Find the closest unprocessed split point to our fraction.
+		sort.Slice(splits, func(i, j int) bool { return splits[i] < splits[j] })
+		var prevDiff int64 = math.MaxInt64
+		var bestS int64 = -1
+		for _, s := range splits {

Review comment:
       Ack, although I think Search can probably be done even with int64. Might be worth benchmarking still.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org