You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/02 23:09:49 UTC
[beam] branch master updated: Adding documentation to DirectRunner
functions. (#8464)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fd946ac Adding documentation to DirectRunner functions. (#8464)
fd946ac is described below
commit fd946acdd035fd10914ea6eab18752e61afceb41
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Thu May 2 16:09:38 2019 -0700
Adding documentation to DirectRunner functions. (#8464)
* Adding documentation to DirectRunner functions.
* undoing change to metrics support flag.
---
sdks/python/apache_beam/runners/direct/evaluation_context.py | 10 +++++++++-
sdks/python/apache_beam/runners/direct/watermark_manager.py | 8 ++++++++
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index a042ded..54397b8 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -32,6 +32,10 @@ from apache_beam.utils import counters
class _ExecutionContext(object):
+ """Contains the context for the execution of a single PTransform.
+
+ It holds the watermarks for that transform, as well as keyed states.
+ """
def __init__(self, watermarks, keyed_states):
self.watermarks = watermarks
@@ -230,6 +234,10 @@ class EvaluationContext(object):
self._lock = threading.Lock()
def _initialize_keyed_states(self, root_transforms, value_to_consumers):
+ """Initialize user state dicts.
+
+ These dicts track user state per-key, per-transform and per-window.
+ """
transform_keyed_states = {}
for transform in root_transforms:
transform_keyed_states[transform] = {}
@@ -260,7 +268,7 @@ class EvaluationContext(object):
completed_bundle: the bundle that was processed to produce the result.
completed_timers: the timers that were delivered to produce the
completed_bundle.
- result: the TransformResult of evaluating the input bundle
+ result: the ``TransformResult`` of evaluating the input bundle
Returns:
the committed bundles contained within the handled result.
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index c0688a4..23431f1 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -224,6 +224,14 @@ class _TransformWatermarks(object):
self._pending.remove(completed)
def refresh(self):
+ """Refresh the watermark for a given transform.
+
+ This method looks at the watermark coming from all input PTransforms, and
+ the timestamp of the minimum element, as well as any watermark holds.
+
+ Returns:
+ True if the watermark has advanced, and False if it has not.
+ """
with self._lock:
min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF
has_pending_elements = False