You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/24 02:15:44 UTC

[GitHub] [beam] youngoli opened a new pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

youngoli opened a new pull request #11517:
URL: https://github.com/apache/beam/pull/11517


   There are still a few small changes I'm thinking of making to the user API frontend before I merge this in, so I may hold off on submitting this until I make some changes and can rebase this. But still sending it out for review since the vast majority of it will be unchanged.
   
   (I'm thinking of changing TrySplit to return two restrictions instead of one restriction tracker. I think both should theoretically work, but that approach is probably more intuitive. And I need to change GetProgress to return two floats, for finished and remaining work.)
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416170107



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) sdf.RTracker`

Review comment:
       Ah, good point. It's supposed to be a concrete type implementing the sdf.RTracker interface. I'll update this to mention that explicitly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416169731



##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.

Review comment:
       I was actually trying to say that during execution, work is only split along element boundaries. But even that is a bit misleading, because the elements in question are the initially split restrictions. So I'll probably either word it something like "This means that for runners that support liquid sharding, only the initially split restrictions can be distributed among workers, and stragglers cannot be split into smaller restrictions". Or maybe I'll keep it simple and just leave the first sentence.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11517:
URL: https://github.com/apache/beam/pull/11517#issuecomment-618763745


   R: @lostluck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416310647



##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,42 +13,36 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package sdf contains interfaces used specifically for splittable DoFns.

Review comment:
       Done, I prefer to call it experimental at the moment.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) sdf.RTracker`

Review comment:
       Done.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r417710871



##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, error) {
 	if tracker.Stopped || tracker.IsDone() {
-		return nil, nil
+		return tracker.Rest, nil, nil
 	}
-	if fraction < 0 || fraction > 1 {
-		return nil, errors.New("fraction must be in range [0, 1]")
+	if fraction < 0 {
+		fraction = 0

Review comment:
       Done.

##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, error) {

Review comment:
       Good point, done.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,87 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns (Experimental)
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Only initially split restrictions can be
+//   distributed by liquid sharding. Stragglers will not be split during
+//   execution with dynamic splitting.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) restrictionTracker`
+//     CreateTracker creates and returns a restriction tracker (a concrete type
+//     implementing `sdf.RTracker`) given a restriction. The restriction tracker

Review comment:
       Don't see why not. Done.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -43,7 +43,7 @@ func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken
 		return "", nil
 	}
 
-  return StageViaLegacyApi(ctx, cc, binary, st)

Review comment:
       Came bundled in with `go fmt` but I'll move it to a separate PR.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)

Review comment:
       Good idea, done.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)
 
-	// GetError returns the error that made this RTracker stop executing, and it returns nil if no
-	// error occurred. If IsDone fails while validating this RTracker, this method will be
-	// called to log the error.
+	// GetError returns the error that made this RTracker stop executing, and returns nil if no
+	// error occurred. This is the error that is emitted if automated validation fails.
 	GetError() error
 
-	// TrySplit splits the current restriction into a primary and residual based on a fraction of the
-	// work remaining. The split is performed along the first valid split point located after the
-	// given fraction of the remainder. This method is called by the SDK harness when receiving a
-	// split request by the runner.
+	// TrySplit splits the current restriction into a primary (currently executing work) and
+	// residual (work to be split off) based on a fraction of work remaining. The split is performed
+	// at the first valid split point located after the given fraction of remaining work.
+	//
+	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
+	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
+	//
+	// This method modifies the underlying restriction in the RTracker to reflect the primary. It
+	// then returns a copy of the newly modified restriction as a primary, and returns a new
+	// restriction for the residual. If the split would produce an empty residual (i.e. the only
+	// split point is the end of the restriction), then the returned residual is nil.
 	//
-	// The current restriction is split into two by modifying the current restriction's endpoint to
-	// turn it into the primary, and returning a new restriction tracker representing the residual.
-	// If no valid split point exists, this method returns nil instead of a residual, but does not
-	// return an error. If this method is unable to split due to some error then it returns nil and
-	// an error.
-	TrySplit(fraction float64) (residual interface{}, err error)
+	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
+	TrySplit(fraction float64) (primary, residual interface{}, err error)

Review comment:
       Hmm, good point. It's definitely something a use should know, especially for writing future-proof SDFs. It would need to be in all the methods I think, except *maybe* GetError. I'm not completely sure off the top of my head though, I feel like it'll depend on the details of how we do dynamic splitting.
   
   For now I'll add a comment saying to make all methods thread-safe. Later we should add the thread-safe wrapper and recommend using that, and if we can dodge mutexes in some methods then we can implement it there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416084895



##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,42 +13,36 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package sdf contains interfaces used specifically for splittable DoFns.

Review comment:
       Note, even if we submit this now-ish, we can still call it experimental until we have testing and some IO usage to bolster confidence.

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) sdf.RTracker`

Review comment:
       Is this supposed to be a concrete type or the literal sdf.RTracker interface?

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,81 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Work can only be split at element boundaries.

Review comment:
       I think in this case you mean 
   ```
   Work can only be split by the initial restriction boundaries.
   ```
   Since technically the initial split could turn every element into three restrictions or something, but those restrictions are at the "sub element" boundaries rather than in every element (eg. Each initial element is a filename, and the Initial split stats each file's size, and just provides restrictions once every 1000 bytes), and handles actually "empty restrictions" during processing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r419635469



##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)
 
-	// GetError returns the error that made this RTracker stop executing, and it returns nil if no
-	// error occurred. If IsDone fails while validating this RTracker, this method will be
-	// called to log the error.
+	// GetError returns the error that made this RTracker stop executing, and returns nil if no
+	// error occurred. This is the error that is emitted if automated validation fails.
 	GetError() error
 
-	// TrySplit splits the current restriction into a primary and residual based on a fraction of the
-	// work remaining. The split is performed along the first valid split point located after the
-	// given fraction of the remainder. This method is called by the SDK harness when receiving a
-	// split request by the runner.
+	// TrySplit splits the current restriction into a primary (currently executing work) and
+	// residual (work to be split off) based on a fraction of work remaining. The split is performed
+	// at the first valid split point located after the given fraction of remaining work.
+	//
+	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
+	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
+	//
+	// This method modifies the underlying restriction in the RTracker to reflect the primary. It
+	// then returns a copy of the newly modified restriction as a primary, and returns a new
+	// restriction for the residual. If the split would produce an empty residual (i.e. the only
+	// split point is the end of the restriction), then the returned residual is nil.
 	//
-	// The current restriction is split into two by modifying the current restriction's endpoint to
-	// turn it into the primary, and returning a new restriction tracker representing the residual.
-	// If no valid split point exists, this method returns nil instead of a residual, but does not
-	// return an error. If this method is unable to split due to some error then it returns nil and
-	// an error.
-	TrySplit(fraction float64) (residual interface{}, err error)
+	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
+	TrySplit(fraction float64) (primary, residual interface{}, err error)

Review comment:
       Technically since GetError is going to be reading from a location that might be concurrently modified, it's required to be thread safe as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11517: [BEAM-9643] Adding Go SDF Documentation.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416930600



##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, error) {
 	if tracker.Stopped || tracker.IsDone() {
-		return nil, nil
+		return tracker.Rest, nil, nil
 	}
-	if fraction < 0 || fraction > 1 {
-		return nil, errors.New("fraction must be in range [0, 1]")
+	if fraction < 0 {
+		fraction = 0

Review comment:
       Might be worth documenting this behavior in the comment.

##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, error) {

Review comment:
       Given this is the example, using named return values (primary, residual ...) is appropriate here for documentation purposes (but not so one can use an empty return.)

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,87 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns (Experimental)
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Only initially split restrictions can be
+//   distributed by liquid sharding. Stragglers will not be split during
+//   execution with dynamic splitting.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) restrictionTracker`
+//     CreateTracker creates and returns a restriction tracker (a concrete type
+//     implementing `sdf.RTracker`) given a restriction. The restriction tracker

Review comment:
       Consider being explicit about sdf.RTracker being an interface.
   eg... implementing the `sdf.RTracker` interface.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)
 
-	// GetError returns the error that made this RTracker stop executing, and it returns nil if no
-	// error occurred. If IsDone fails while validating this RTracker, this method will be
-	// called to log the error.
+	// GetError returns the error that made this RTracker stop executing, and returns nil if no
+	// error occurred. This is the error that is emitted if automated validation fails.
 	GetError() error
 
-	// TrySplit splits the current restriction into a primary and residual based on a fraction of the
-	// work remaining. The split is performed along the first valid split point located after the
-	// given fraction of the remainder. This method is called by the SDK harness when receiving a
-	// split request by the runner.
+	// TrySplit splits the current restriction into a primary (currently executing work) and
+	// residual (work to be split off) based on a fraction of work remaining. The split is performed
+	// at the first valid split point located after the given fraction of remaining work.
+	//
+	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
+	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.

Review comment:
       +1 to this concrete example.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)
 
-	// GetError returns the error that made this RTracker stop executing, and it returns nil if no
-	// error occurred. If IsDone fails while validating this RTracker, this method will be
-	// called to log the error.
+	// GetError returns the error that made this RTracker stop executing, and returns nil if no
+	// error occurred. This is the error that is emitted if automated validation fails.
 	GetError() error
 
-	// TrySplit splits the current restriction into a primary and residual based on a fraction of the
-	// work remaining. The split is performed along the first valid split point located after the
-	// given fraction of the remainder. This method is called by the SDK harness when receiving a
-	// split request by the runner.
+	// TrySplit splits the current restriction into a primary (currently executing work) and
+	// residual (work to be split off) based on a fraction of work remaining. The split is performed
+	// at the first valid split point located after the given fraction of remaining work.
+	//
+	// For example, a fraction of 0.5 means to split at the halfway point of remaining work only. If
+	// 50% of work is done and 50% remaining, then a fraction of 0.5 would split after 75% of work.
+	//
+	// This method modifies the underlying restriction in the RTracker to reflect the primary. It
+	// then returns a copy of the newly modified restriction as a primary, and returns a new
+	// restriction for the residual. If the split would produce an empty residual (i.e. the only
+	// split point is the end of the restriction), then the returned residual is nil.
 	//
-	// The current restriction is split into two by modifying the current restriction's endpoint to
-	// turn it into the primary, and returning a new restriction tracker representing the residual.
-	// If no valid split point exists, this method returns nil instead of a residual, but does not
-	// return an error. If this method is unable to split due to some error then it returns nil and
-	// an error.
-	TrySplit(fraction float64) (residual interface{}, err error)
+	// If an error is returned, some catastrophic failure occurred and the entire bundle will fail.
+	TrySplit(fraction float64) (primary, residual interface{}, err error)

Review comment:
       IIRC for dynamic splitting, this is the one that requires the Tracker to be concurrency safe? Do we want to declared ahead of time that implementations of RTracker must be threadsafe?

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -43,7 +43,7 @@ func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken
 		return "", nil
 	}
 
-  return StageViaLegacyApi(ctx, cc, binary, st)

Review comment:
       This cleanup is  separate from the rest of the change.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for tracking a single
-// restriction type, which is the type that should be used to create the RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-	// TryClaim attempts to claim the block of work in the current restriction located at a given
-	// position. This method must be used in the ProcessElement method of Splittable DoFns to claim
-	// work before performing it. If no work is claimed, the ProcessElement is not allowed to perform
-	// work or emit outputs. If the claim is successful, the DoFn must process the entire block. If
-	// the claim is unsuccessful the ProcessElement method of the DoFn must return without performing
-	// any additional work or emitting any outputs.
-	//
-	// TryClaim accepts an arbitrary value that can be interpreted as the position of a block, and
-	// returns a boolean indicating whether the claim succeeded.
+	// TryClaim attempts to claim the block of work located in the given position of the
+	// restriction. This method must be called in ProcessElement to claim work before it can be
+	// processed. Processing work without claiming it first can lead to incorrect output.
 	//
-	// If the claim fails due to an error, that error can be retrieved with GetError.
+	// If the claim is successful, the DoFn must process the entire block. If the claim is
+	// unsuccessful ProcessElement method of the DoFn must return without performing
+	// any additional work or emitting any outputs.
 	//
-	// For SDFs to work properly, claims must always be monotonically increasing in reference to the
-	// restriction's start and end points, and every block of work in a restriction must be claimed.
+	// If the claim fails due to an error, that error is stored and will be automatically emitted
+	// when the RTracker is validated, or can be manually retrieved with GetError.
 	//
 	// This pseudocode example illustrates the typical usage of TryClaim:
 	//
-	// 	pos = position of first block after restriction.start
+	// 	pos = position of first block within the restriction
 	// 	for TryClaim(pos) == true {
 	// 		// Do all work in the claimed block and emit outputs.
-	// 		pos = position of next block
+	// 		pos = position of next block within the restriction
 	// 	}
 	// 	return
 	TryClaim(pos interface{}) (ok bool)

Review comment:
       It would be worth explicitly saying that the position type is related directly to the type of Restriction being handled. 
   
   Eg.
   A linear offset restriction could use a single int64 value to represent a position. Similarly a multi dimensional restriction space could use a more complex type to represent the area claimed as a position.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org