You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/11/13 22:12:36 UTC

[beam] branch master updated: Update SDF related documentation.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a8337b8  Update SDF related documentation.
     new d2ff41e  Merge pull request #13313 from [BEAM-10480] Update SDF related documentation.
a8337b8 is described below

commit a8337b8a6b822ec8867d0bdd00bfa1b32e7278f3
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Wed Nov 11 17:03:51 2020 -0800

    Update SDF related documentation.
---
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  4 +++-
 .../splittabledofn/RestrictionTracker.java         | 23 +++++++++++++++++-
 sdks/python/apache_beam/io/iobase.py               | 12 ++++++----
 sdks/python/apache_beam/transforms/core.py         | 28 +++++++++++++++-------
 4 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 78b6ef7..115b480 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -670,7 +670,9 @@ public abstract class DoFn<InputT extends @Nullable Object, OutputT extends @Nul
    *       representation of work. See {@link GetSize} and {@link RestrictionTracker.HasProgress}
    *       for further details.
    *   <li>It <i>should</i> define a {@link SplitRestriction} method. This method enables runners to
-   *       perform bulk splitting initially allowing for a rapid increase in parallelism. See {@link
+   *       perform bulk splitting initially allowing for a rapid increase in parallelism. If it is
+   *       not defined, there is no initial split happening by default. Note that initial split is a
+   *       different concept from the split during element processing time. See {@link
    *       RestrictionTracker#trySplit} for details about splitting when the current element and
    *       restriction are actively being processed.
    *   <li>It <i>may</i> define a {@link TruncateRestriction} method to choose how to truncate a
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 9ce33bf..77a0592 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -27,6 +27,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * Manages access to the restriction and keeps track of its claimed part for a <a
  * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
  *
+ * <p>The restriction may be modified by different threads, however the system will ensure
+ * sufficient locking such that no methods on the restriction tracker will be called concurrently.
+ *
  * <p>{@link RestrictionTracker}s should implement {@link HasProgress} otherwise poor auto-scaling
  * of workers and/or splitting may result if the progress is an inaccurate representation of the
  * known amount of completed and remaining work.
@@ -49,12 +52,20 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
    *       call to this method).
    *   <li>{@link RestrictionTracker#checkDone} MUST succeed.
    * </ul>
+   *
+   * This method is <b>required</b> to be implemented.
    */
   public abstract boolean tryClaim(PositionT position);
 
   /**
    * Returns a restriction accurately describing the full range of work the current {@link
    * DoFn.ProcessElement} call will do, including already completed work.
+   *
+   * <p>The current restriction returned by method may be updated dynamically due to due to
+   * concurrent invocation of other methods of the {@link RestrictionTracker}, For example, {@link
+   * RestrictionTracker#trySplit(double)}.
+   *
+   * <p>This method is <b>required</b> to be implemented.
    */
   public abstract RestrictionT currentRestriction();
 
@@ -83,6 +94,9 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
    * <p>The API is recommended to be implemented for a batch pipeline to improve parallel processing
    * performance.
    *
+   * <p>The API is recommended to be implemented for batch pipeline given that it is very important
+   * for pipeline scaling and end to end pipeline execution.
+   *
    * <p>The API is required to be implemented for a streaming pipeline.
    *
    * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should
@@ -94,10 +108,15 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
   public abstract @Nullable SplitResult<RestrictionT> trySplit(double fractionOfRemainder);
 
   /**
-   * Called by the runner after {@link DoFn.ProcessElement} returns.
+   * Checks whether the restriction has been fully processed.
+   *
+   * <p>Called by the SDK harness after {@link DoFn.ProcessElement} returns.
    *
    * <p>Must throw an exception with an informative error message, if there is still any unclaimed
    * work remaining in the restriction.
+   *
+   * <p>This method is <b>required</b> to be implemented in order to prevent data loss during SDK
+   * processing.
    */
   public abstract void checkDone() throws IllegalStateException;
 
@@ -116,6 +135,8 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
    * <p>It is valid to return {@link IsBounded#BOUNDED} after returning {@link IsBounded#UNBOUNDED}
    * once the end of a restriction is discovered. It is not valid to return {@link
    * IsBounded#UNBOUNDED} after returning {@link IsBounded#BOUNDED}.
+   *
+   * <p>This method is <b>required</b> to be implemented.
    */
   public abstract IsBounded isBounded();
 
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index b0cf4e3..dbc030e 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1193,7 +1193,7 @@ class _RoundRobinKeyFn(core.DoFn):
 
 
 class RestrictionTracker(object):
-  """Manages concurrent access to a restriction.
+  """Manages access to a restriction.
 
   Keeps track of the restrictions claimed part for a Splittable DoFn.
 
@@ -1236,8 +1236,8 @@ class RestrictionTracker(object):
   def check_done(self):
     """Checks whether the restriction has been fully processed.
 
-    Called by the runner after iterator returned by ``DoFn.process()`` has been
-    fully read.
+    Called by the SDK harness after iterator returned by ``DoFn.process()``
+    has been fully read.
 
     This method must raise a `ValueError` if there is still any unclaimed work
     remaining in the restriction when this method is invoked. Exception raised
@@ -1294,7 +1294,8 @@ class RestrictionTracker(object):
 
   def try_claim(self, position):
     """Attempts to claim the block of work in the current restriction
-    identified by the given position.
+    identified by the given position. Each claimed position MUST be a valid
+    split point.
 
     If this succeeds, the DoFn MUST execute the entire block of work. If it
     fails, the ``DoFn.process()`` MUST return ``None`` without performing any
@@ -1332,7 +1333,8 @@ class RestrictionTracker(object):
 
 class WatermarkEstimator(object):
   """A WatermarkEstimator which is used for estimating output_watermark based on
-  the timestamp of output records or manual modifications.
+  the timestamp of output records or manual modifications. Please refer to
+  ``watermark_estiamtors`` for commonly used watermark estimators.
 
   The base class provides common APIs that are called by the framework, which
   are also accessible inside a DoFn.process() body. Derived watermark estimator
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index dd865a0..e2b4ae3 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -226,13 +226,14 @@ class RestrictionProvider(object):
   for the following methods:
   * create_tracker()
   * initial_restriction()
+  * restriction_size()
 
   Optionally, ``RestrictionProvider`` may override default implementations of
   following methods:
   * restriction_coder()
-  * restriction_size()
   * split()
   * split_and_size()
+  * truncate()
 
   ** Pausing and resuming processing of an element **
 
@@ -240,12 +241,10 @@ class RestrictionProvider(object):
   ``DoFn.process()`` method, a Splittable ``DoFn`` may return an object of type
   ``ProcessContinuation``.
 
-  If provided, ``ProcessContinuation`` object specifies that runner should
-  later re-invoke ``DoFn.process()`` method to resume processing the current
-  element and the manner in which the re-invocation should be performed. A
-  ``ProcessContinuation`` object must only be specified as the last element of
-  the iterator. If a ``ProcessContinuation`` object is not provided the runner
-  will assume that the current input element has been fully processed.
+  If restriction_tracker.defer_remander is called in the ```DoFn.process()``, it
+  means that runner should later re-invoke ``DoFn.process()`` method to resume
+  processing the current element and the manner in which the re-invocation
+  should be performed.
 
   ** Updating output watermark **
 
@@ -282,7 +281,13 @@ class RestrictionProvider(object):
     raise NotImplementedError
 
   def split(self, element, restriction):
-    """Splits the given element and restriction.
+    """Splits the given element and restriction initially.
+
+    This method enables runners to perform bulk splitting initially allowing for
+    a rapid increase in parallelism. Note that initial split is a different
+    concept from the split during element processing time. Please refer to
+    ``iobase.RestrictionTracker.try_split`` for details about splitting when the
+    current element and restriction are actively being processed.
 
     Returns an iterator of restrictions. The total set of elements produced by
     reading input element for each of the returned restrictions should be the
@@ -291,6 +296,9 @@ class RestrictionProvider(object):
 
     This API is optional if ``split_and_size`` has been implemented.
 
+    If this method is not override, there is no initial splitting happening on
+    each restriction.
+
     """
     yield restriction
 
@@ -337,6 +345,10 @@ class RestrictionProvider(object):
     Return a truncated finite restriction if further processing is required
     otherwise return None to represent that no further processing of this
     restriction is required.
+
+    The default behavior when a pipeline is being drained is that bounded
+    restrictions process entirely while unbounded restrictions process till a
+    checkpoint is possible.
     """
     restriction_tracker = self.create_tracker(restriction)
     if restriction_tracker.is_bounded():