You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/21 17:47:13 UTC

[GitHub] [beam] lukecwik opened a new pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

lukecwik opened a new pull request #13160:
URL: https://github.com/apache/beam/pull/13160


   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-713748772


   R: @rosetn The first commit is a straight translation of the programming guide portion of https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
   
   All future commits are changes to the website, filling in code snippets, etc...
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-713892223


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r509833618



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()

Review comment:
       Note that `rest.Size()` is a convenience function added to `offsetrange.Restriction` specifically, it's not part of the SDF API. It might be better to replace it with `rest.End - rest.Start` to avoid the expectation that all restrictions will have a size method.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()
+}
+{{< /highlight >}}
+
+### 12.3 User initiated checkpoint {#user-initiated-checkpoint}
+
+Some I/Os cannot produce all of the data necessary to complete a restriction within the lifetime of a
+single bundle. This typically happens with unbounded restrictions, but can also happen with bounded
+restrictions. For example, there could be more data that needs to be ingested but is not available yet.
+Another cause of this scenario is the source system throttling your data.
+
+Your splittable DoFn can signal to you that you are not done processing the current restriction. This
+signal can suggest a time to resume at. While the runner tries to honor the resume time, this is not
+guaranteed. This allows execution to continue on a restriction that has available work improving
+resource utilization.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+### 12.4 Runner-initiated split {#runner-initiated-split}
+
+A runner at any time may attempt to split a restriction while it is being processed. This allows the
+runner to either pause processing of the restriction so that other work may be done (common for
+unbounded restrictions to limit the amount of output and/or improve latency) or split the restriction
+into two pieces, increasing the available parallelism within the system. It is important to author a
+splittable DoFn with this in mind since the end of the restriction may change. Thus when writing the
+processing loop, it is important to use the result from trying to claim a piece of the restriction
+instead of assuming one can process till the end.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *badTryClaimLoop) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+
+	// The restriction tracker can be modified by another thread in parallel
+	// so storing state locally is ill advised.
+	end = rt.GetRestriction().(offsetrange.Restriction).End
+	for offset < end {
+		// Only after successfully claiming should we produce any output and/or
+		// perform side effects.
+    	rt.TryClaim(offset)
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+### 12.5 Watermark estimation {#watermark-estimation}
+
+The default watermark estimator does not produce a watermark estimate. Therefore, the output watermark
+is solely computed by the minimum of upstream watermarks.
+
+As a splittable DoFn pr an element and restriction pair, it can advance the output watermark by specifying

Review comment:
       Typo: "As a splittable DoFn **pr** an element..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rosetn commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
rosetn commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510318072



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a

Review comment:
       The class `DoFn` should be in code font when it's within the text. I see that this is inconsistently applied in the programming guide, but let's make the changes in at least this section. https://developers.google.com/style/code-in-text

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the

Review comment:
       Add comma
   
   Traditionally, users

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}

Review comment:
       Also add them for each header. This stopped in the last two sections, but we can keep it consistent with the rest of the programming guide here.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()
+}
+{{< /highlight >}}
+
+### 12.3 User initiated checkpoint {#user-initiated-checkpoint}

Review comment:
       User-initiated

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).

Review comment:
       Can you add more explanation to the "Checkpoint/split" bubble in the diagram on this list?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a

Review comment:
       You can leave "SDF" in normal font

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}

Review comment:
       Missing period
   
   ## 12. Splittable `DoFns` {#splittable-dofns}




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-713746062


   Run Python_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714089850


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rosetn commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
rosetn commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510487900



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,281 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12. Splittable `DoFns` {#splittable-dofns}
+
+A Splittable `DoFn` (SDF) enables users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally, users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular `DoFn` that read the file (decreased performance). With SDF,
+we bring the richness of Apache Beam’s I/O APIs to a `DoFn` enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1. SDF basics {#sdf-basics}
+
+At a high level, a SDF is responsible for processing element and restriction pairs. A

Review comment:
       Let's make these "an SDF"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510373174



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)

Review comment:
       Consider pointing out that with *only* the above example code, the restrictions don't currently do anything extra WRT the same DoFn without the restriction handling. It's a non-splittable-dofn with vestigial extras.  (there's probably a much better way to phrase this). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #13160:
URL: https://github.com/apache/beam/pull/13160


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510520966



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)

Review comment:
       Ack SGTM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714237988


   The Python precommit failure is an existing issue: https://issues.apache.org/jira/browse/BEAM-11101
   
   Will need to wait till that is resolved so I can rerun that test.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-713772747


   @rosetn I kept the intro segment from the google doc in the programming guide as I didn't think jumping straight to high level overview made much sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510399030



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)

Review comment:
       I think we should deal with this in the follow-up about trySplit being implemented correctly within a restriction tracker and adding that to the runner initiated splits section then lumping it in now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510397882



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).

Review comment:
       Below the diagram I have the explanation for the checkpoint/split. I moved it to be part of the list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-713899777


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714658332


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714031622


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714031952


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #13160:
URL: https://github.com/apache/beam/pull/13160#issuecomment-714841000


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org