You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/08/07 21:20:51 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #27632: Initial pass at modernizing the Runner author guide.

lostluck commented on code in PR #27632:
URL: https://github.com/apache/beam/pull/27632#discussion_r1279545345


##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -160,64 +161,51 @@ it from the main input, which is processed one element at a time. The SDK/user
 prepares a `PCollection` adequately, the runner materializes it, and then the
 runner feeds it to the `DoFn`.
 
-What you will need to implement is to inspect the materialization requested for
-the side input, and prepare it appropriately, and corresponding interactions
-when a `DoFn` reads the side inputs.
-
-The details and available support code vary by language.
-
-**Java**
-
-If you are using one of the above `DoFnRunner` classes, then the interface for
-letting them request side inputs is
-[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java).
-It is a simple mapping from side input and window to a value. The `DoFnRunner`
-will perform a mapping with the
-[`WindowMappingFn`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java)
-to request the appropriate window so you do not worry about invoking this UDF.
-When using the Fn API, it will be the SDK harness that maps windows as well.
-
-A simple, but not necessarily optimal approach to building a
-[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java)
-is to use a state backend. In our Java support code, this is called
-[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java)
-and you can build a
-[`SideInputHandler`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java)
-that will use your `StateInternals` to materialize a `PCollection` into the
-appropriate side input view and then yield the value when requested for a
-particular side input and window.
+Unlike main input data, which is *pushed* by the runner to the `ParDo` (generally
+via the FnApi Data channel), side input data is *pulled* by the `ParDo`
+from the runner (generally over the FnAPI State channel).
+
+A side input is accessed via a specific `access_pattern`.
+There are currently two access patterns enumerated in the
+`StandardSideInputTypes` proto: `beam:side_input:iterable:v1` which indicates
+the runner must return all values in a PCollection corresponding to a specific
+window and `beam:side_input:multimap:v1` which indicates the runner must return
+all values corresponding to a specific key and window.
+Being able to serve these access patterns efficiently may influence how a
+runner materializes this PCollection.
+
+SideInputs can be detected by looking at the `side_inputs` map in the
+`ParDoPayload` of `ParDo` transforms.
+The `ParDo` operation itself is responsible for invoking the
+`window_mapping_fn` (before invoking the runner) and `view_fn` (on the
+runner-returned values), so the runner need not concern itself with these
+fields.
 
 When a side input is needed but the side input has no data associated with it
 for a given window, elements in that window must be deferred until the side
-input has some data. The aforementioned
+input has some data or the watermark has advances sufficiently such that
+we can be sure there will be no data for that window. The
 [`PushBackSideInputDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java)
-is used to implement this.
-
-**Python**
-
-In Python, [`SideInputMap`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.sideinputs.SideInputMap) maps
-windows to side input values. The `WindowMappingFn` manifests as a simple
-function. See
-[sideinputs.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py).
+is an example of implementing this.
 
 #### State and Timers
 
 _Main design document: [https://s.apache.org/beam-state](https://s.apache.org/beam-state)_
 
 When a `ParDo` includes state and timers, its execution on your runner is usually
-very different. See the full details beyond those covered here.
+very different. In particular, the state must be persisted when the bundle
+completes and retrieved for future bundles. Timers that are set must also be
+injected into future bundles as the watermark advances sufficiently.
 
-State and timers are partitioned per key and window. You may need or want to
+State and timers are partitioned per key and window, that is, a `DoFn`
+processing a given key must have a consistent view of the state and timers
+across all elements that share this key. You may need or want to
 explicitly shuffle data to support this.
+Once the watermark has passed the end of the window (plus an allowance for
+allowed lateness, if any), state associated with this window can be dropped.
 
-**Java**
-
-We provide
-[`StatefulDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java)
-to help with state cleanup. The non-user-facing interface
-[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java)
-is what a runner generally implements, and then the Beam support code can use
-this to implement user-facing state.
+State setting and retrieval is performed on the FnAPI State channel, whereas
+timer setting and firing happens on the FnAPI Data channel.
 
 #### Splittable DoFn

Review Comment:
   We need to adjust the first paragraph of this section since we remove the later documentation about Read, BoundedSource, or UnboundedSource. 
   It's also referencing things that have yet to be introduced in the document, which makes the document harder to read linearly.



##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -89,6 +117,13 @@ But if your data is arriving as a stream, then you will want to terminate a
 bundle in order to achieve appropriate latency, so bundles may be just a few
 elements.
 
+A bundle is the unit of commitment in Beam. If an error is encountered while
+processing a bundle, all the prior outputs of that bundle (including any
+modifications to state or timers) must be discarded and the entire bundle
+retried.  Upon successful completion of a bundle, its outputs, together with
+any state/timer modifications and watermark updates, must be committed
+atomically.
+
 #### The DoFn Lifecycle
 
 While each language's SDK is free to make different decisions, the Python and

Review Comment:
   Since we're making this doc more Portability aware, I think it would be good to emphasize that an SDK independant runner leaves this detail to the SDK to handle. 
   
   I'd personally prefer a more Portability first approach to this document, but that would be a much larger rewrite that doesn't have to happen in this PR. It does feel like we're kind of going back and forth (eg. The Side Input section is  portability first due to the hard FnAPI descriptions, not mentioning either Direct approach at all.)



##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -249,8 +251,9 @@ to group in a way that is consistent with grouping by those bytes, even if you
 have some special knowledge of the types involved.
 
 The elements you are processing will be key-value pairs, and you'll need to extract
-the keys. For this reason, the format of key-value pairs is standardized and
-shared across all SDKS. See either
+the keys. For this reason, the format of key-value pairs is
+[standardized and shared](https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L838)
+across all SDKS. See either
 [`KvCoder`](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/coders/KvCoder.html)
 in Java or
 [`TupleCoder`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.coders.html#apache_beam.coders.coders.TupleCoder.key_coder)

Review Comment:
   These are referring to the v2.0.0 docs. They likely haven't changed, but is it possible for us to have version agnostic links? Or linking to the code in the repo, vs the docs?



##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -399,59 +370,28 @@ fast path as an optimization.
 ### Special mention: the Combine composite
 
 A composite transform that is almost always treated specially by a runner is
-`Combine` (per key), which applies an associative and commutative operator to
+`CombinePerKey`, which applies an associative and commutative operator to
 the elements of a `PCollection`. This composite is not a primitive. It is
 implemented in terms of `ParDo` and `GroupByKey`, so your runner will work
 without treating it - but it does carry additional information that you
 probably want to use for optimizations: the associative-commutative operator,
 known as a `CombineFn`.
 
+Generally runners will want to implement this via what is called
+combiner lifting, where a new operation is placed before the `GroupByKey`
+that does partial (within-bundle) combining, which often requires a slight
+modification of what comes after the `GroupByKey` as well.
+An example of this transformation can be found in the

Review Comment:
   To toot my own horn a little, [handlecombine.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go#L67) in the prism runner has a detailed comment about the PCollection/Coder adjustments that occur when lifting a combine, which is a bit easier to read than the python code (where it's a bit of a game of "spot the differences", between the repeated clauses).



##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -555,6 +495,10 @@ All runner code should go in it's own package in `apache_beam/runners` directory
 Register the new runner in the `create_runner` function of `runner.py` so that the
 partial name is matched with the correct class to be used.
 
+Python Runners can also be identified (e.g. when passing the runner parameter)
+by their fully qualified name whether or not they live in the Beam repository.
+
+

Review Comment:
   rm spare line



##########
website/www/site/content/en/contribute/runner-guide.md:
##########
@@ -57,6 +56,19 @@ native environment, this may look like throwing an
 `UnsupportedOperationException`.  The Runner API RPCs will make this explicit,
 for cross-language portability.
 
+### Implementing the Impulse primitive
+
+`Impulse` is a PTransform that takes no inputs and produces exactly one output
+during the lifetime of the pipeline which should be the empty bytes in the
+global window with the minimum timestamp.  This has the encoded value of
+`7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00` when encoded with the standard
+windowed value coder.
+
+Though `Impulse` is generally not invoked by a user, it is the only root
+primitive operation, and other root operations (like Reads and `Create`)

Review Comment:
   Consider removing the "Read" mention here, since it's removed later in the doc anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org