You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 23:09:49 UTC

[beam] branch master updated: Adding documentation to DirectRunner functions. (#8464)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fd946ac  Adding documentation to DirectRunner functions. (#8464)
fd946ac is described below

commit fd946acdd035fd10914ea6eab18752e61afceb41
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Thu May 2 16:09:38 2019 -0700

    Adding documentation to DirectRunner functions. (#8464)
    
    * Adding documentation to DirectRunner functions.
    
    * undoing change to metrics support flag.
---
 sdks/python/apache_beam/runners/direct/evaluation_context.py | 10 +++++++++-
 sdks/python/apache_beam/runners/direct/watermark_manager.py  |  8 ++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index a042ded..54397b8 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -32,6 +32,10 @@ from apache_beam.utils import counters
 
 
 class _ExecutionContext(object):
+  """Contains the context for the execution of a single PTransform.
+
+  It holds the watermarks for that transform, as well as keyed states.
+  """
 
   def __init__(self, watermarks, keyed_states):
     self.watermarks = watermarks
@@ -230,6 +234,10 @@ class EvaluationContext(object):
     self._lock = threading.Lock()
 
   def _initialize_keyed_states(self, root_transforms, value_to_consumers):
+    """Initialize user state dicts.
+
+    These dicts track user state per-key, per-transform and per-window.
+    """
     transform_keyed_states = {}
     for transform in root_transforms:
       transform_keyed_states[transform] = {}
@@ -260,7 +268,7 @@ class EvaluationContext(object):
       completed_bundle: the bundle that was processed to produce the result.
       completed_timers: the timers that were delivered to produce the
                         completed_bundle.
-      result: the TransformResult of evaluating the input bundle
+      result: the ``TransformResult`` of evaluating the input bundle
 
     Returns:
       the committed bundles contained within the handled result.
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index c0688a4..23431f1 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -224,6 +224,14 @@ class _TransformWatermarks(object):
         self._pending.remove(completed)
 
   def refresh(self):
+    """Refresh the watermark for a given transform.
+
+    This method looks at the watermark coming from all input PTransforms, and
+    the timestamp of the minimum element, as well as any watermark holds.
+
+    Returns:
+      True if the watermark has advanced, and False if it has not.
+    """
     with self._lock:
       min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF
       has_pending_elements = False