You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/22 17:46:42 UTC

[GitHub] [beam] rosetn commented on a change in pull request #13160: [BEAM-11078] Add splittable DoFn documentation to programming guide

rosetn commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510318072



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a

Review comment:
       The class `DoFn` should be in code font when it's within the text. I see that this is inconsistently applied in the programming guide, but let's make the changes in at least this section. https://developers.google.com/style/code-in-text

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the

Review comment:
       Add comma
   
   Traditionally, users

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}

Review comment:
       Also add them for each header. This stopped in the last two sections, but we can keep it consistent with the rest of the programming guide here.

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for a given element.
+The restriction provider lets SDF authors override default implementations for splitting, sizing,
+watermark estimation, and so forth. In [Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and [Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. [Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   getFileLength(filename),
+	}
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
+	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename string, emit func(int)) error {
+            file, err := os.Open(filename)
+	if err != nil {
+		return err
+	}
+	offset, err := seekToNextRecordBoundaryInFile(file, rt.GetRestriction().(offsetrange.Restriction).Start)
+
+	if err != nil {
+		return err
+	}
+	for rt.TryClaim(offset) {
+		record, newOffset := readNextRecord(file)
+		emit(record)
+		offset = newOffset
+	}
+	return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated splits](#runner-initiated-split)
+enabling dynamic work rebalancing. To increase the rate at which initial parallelization of work occurs
+or for those runners that do not support runner-initiated splitting, we recommend providing
+a set of initial splits:
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_BasicExampleWithSplitting >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) SplitRestriction(filename string, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	size := 64 * (1 << 20)
+	i := rest.Start
+	for i < rest.End - size {
+		// Compute and output 64 MiB size ranges to process in parallel
+		end := i + size
+     		splits = append(splits, offsetrange.Restriction{i, end})
+		i = end
+	}
+	// Output the last range
+	splits = append(splits, offsetrange.Restriction{i, rest.End})
+	return splits
+}
+{{< /highlight >}}
+
+### 12.2 Sizing and progress {#sizing-and-progress}
+
+Sizing and progress are used during execution of a splittable DoFn to inform runners so that they may
+perform intelligent decisions about which restrictions to split and how to parallelize work.
+
+Before processing an element and restriction, an initial size may be used by a runner to choose
+how and who processes the restrictions attempting to improve initial balancing and parallelization
+of work. During the processing of an element and restriction, sizing and progress are used to choose
+which restrictions to split and who should process them.
+
+By default, we use the restriction tracker’s estimate for work remaining falling back to assuming
+that all restrictions have an equal cost. To override the default, SDF authors can provide the
+appropriate method within the restriction provider.
+
+{{< highlight java >}}
+{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SDF_GetSize >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) RestrictionSize(filename string, rest offsetrange.Restriction) float64 {
+	weight := float64(1)
+	if strings.Contains(filename, “expensiveRecords”) {
+		weight = 2
+	}
+	return weight * rest.Size()
+}
+{{< /highlight >}}
+
+### 12.3 User initiated checkpoint {#user-initiated-checkpoint}

Review comment:
       User-initiated

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).

Review comment:
       Can you add more explanation to the "Checkpoint/split" bubble in the diagram on this list?

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing I/Os (and some advanced
+[non I/O use cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having modular
+I/O components that can be connected to each other simplify typical patterns that users want.
+For example, a popular use case is to read filenames from a message queue followed by parsing those
+files. Traditionally users were required to either write a single I/O connector that contained the
+logic for the message queue and the file reader (increased complexity) or choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and restriction pairs. A
+restriction represents a subset of work that would have been necessary to have been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is read).
+
+![Diagram of steps that a splittable DoFn is composed of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a restriction provider, and a

Review comment:
       You can leave "SDF" in normal font

##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}

Review comment:
       Missing period
   
   ## 12. Splittable `DoFns` {#splittable-dofns}




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org