You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/02/25 19:20:36 UTC

[beam] branch master updated: Use NoOpWatermarkEstimator in sdf_direct_runner

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 40411e6  Use NoOpWatermarkEstimator in sdf_direct_runner
     new 5369582  Merge pull request #10957 from boyuanzz/sdf_direct_runner
40411e6 is described below

commit 40411e62bad54694d254bb79d642fe4518171b0b
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Feb 24 18:36:41 2020 -0800

    Use NoOpWatermarkEstimator in sdf_direct_runner
---
 sdks/python/apache_beam/runners/direct/sdf_direct_runner.py | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index cd83448..8da90ab 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -43,6 +43,7 @@ from apache_beam.runners.common import OutputProcessor
 from apache_beam.runners.direct.evaluation_context import DirectStepContext
 from apache_beam.runners.direct.util import KeyedWorkItem
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
+from apache_beam.runners.sdf_utils import NoOpWatermarkEstimatorProvider
 from apache_beam.transforms.core import ParDo
 from apache_beam.transforms.core import ProcessContinuation
 from apache_beam.transforms.ptransform import PTransform
@@ -462,10 +463,13 @@ class SDFProcessElementInvoker(object):
       checkpoint_state.checkpointed = object()
 
     output_processor.reset()
+    noop_estimator = (
+        NoOpWatermarkEstimatorProvider().create_watermark_estimator(None))
     Timer(self._max_duration, initiate_checkpoint).start()
     sdf_invoker.invoke_process(
         element,
         restriction_tracker=tracker,
+        watermark_estimator=noop_estimator,
         additional_args=args,
         additional_kwargs=kwargs)
 
@@ -511,7 +515,8 @@ class _OutputProcessor(OutputProcessor):
   def __init__(self):
     self.output_iter = None
 
-  def process_outputs(self, windowed_input_element, output_iter):
+  def process_outputs(
+      self, windowed_input_element, output_iter, watermark_estimator=None):
     # type: (WindowedValue, Iterable[Any]) -> None
     self.output_iter = output_iter
 
@@ -520,5 +525,6 @@ class _OutputProcessor(OutputProcessor):
 
 
 class _NoneShallPassOutputProcessor(OutputProcessor):
-  def process_outputs(self, windowed_input_element, output_iter):
+  def process_outputs(
+      self, windowed_input_element, output_iter, watermark_estimator=None):
     raise RuntimeError()