You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/22 04:18:59 UTC

[GitHub] [beam] youngoli commented on a change in pull request #13160: [BEAM-11078, BEAM-2081] Add splittable DoFn documentation to programming guide

youngoli commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r509833618



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()

Review comment:
       Note that `rest.Size()` is a convenience function added to `offsetrange.Restriction` specifically, it's not part of the SDF API. It might be better to replace it with `rest.End - rest.Start` to avoid the expectation that all restrictions will have a size method.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()
+}
+{{< /highlight >}}
+
+### 12.3 User initiated checkpoint {#user-initiated-checkpoint}
+
+Some I/Os cannot produce all of the data necessary to complete a restriction within the lifetime of a
+single bundle. This typically happens with unbounded restrictions, but can also happen with bounded
+restrictions. For example, there could be more data that needs to be ingested but is not available yet.
+Another cause of this scenario is the source system throttling your data.
+
+Your splittable DoFn can signal to you that you are not done processing the current restriction. This
+signal can suggest a time to resume at. While the runner tries to honor the resume time, this is not
+guaranteed. This allows execution to continue on a restriction that has available work improving
+resource utilization.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_UserInitiatedCheckpoint >}}
+{{< /highlight >}}
+
+### 12.4 Runner-initiated split {#runner-initiated-split}
+
+A runner at any time may attempt to split a restriction while it is being processed. This allows the
+runner to either pause processing of the restriction so that other work may be done (common for
+unbounded restrictions to limit the amount of output and/or improve latency) or split the restriction
+into two pieces, increasing the available parallelism within the system. It is important to author a
+splittable DoFn with this in mind since the end of the restriction may change. Thus when writing the
+processing loop, it is important to use the result from trying to claim a piece of the restriction
+instead of assuming one can process till the end.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BadTryClaimLoop >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *badTryClaimLoop) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+
+	// The restriction tracker can be modified by another thread in parallel
+	// so storing state locally is ill advised.
+	end = rt.GetRestriction().(offsetrange.Restriction).End
+	for offset < end {
+		// Only after successfully claiming should we produce any output and/or
+		// perform side effects.
+    	rt.TryClaim(offset)
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+### 12.5 Watermark estimation {#watermark-estimation}
+
+The default watermark estimator does not produce a watermark estimate. Therefore, the output watermark
+is solely computed by the minimum of upstream watermarks.
+
+As a splittable DoFn pr an element and restriction pair, it can advance the output watermark by specifying

Review comment:
       Typo: "As a splittable DoFn **pr** an element..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org