You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/02/25 19:20:36 UTC
[beam] branch master updated: Use NoOpWatermarkEstimator in
sdf_direct_runner
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 40411e6 Use NoOpWatermarkEstimator in sdf_direct_runner
new 5369582 Merge pull request #10957 from boyuanzz/sdf_direct_runner
40411e6 is described below
commit 40411e62bad54694d254bb79d642fe4518171b0b
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Feb 24 18:36:41 2020 -0800
Use NoOpWatermarkEstimator in sdf_direct_runner
---
sdks/python/apache_beam/runners/direct/sdf_direct_runner.py | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index cd83448..8da90ab 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -43,6 +43,7 @@ from apache_beam.runners.common import OutputProcessor
from apache_beam.runners.direct.evaluation_context import DirectStepContext
from apache_beam.runners.direct.util import KeyedWorkItem
from apache_beam.runners.direct.watermark_manager import WatermarkManager
+from apache_beam.runners.sdf_utils import NoOpWatermarkEstimatorProvider
from apache_beam.transforms.core import ParDo
from apache_beam.transforms.core import ProcessContinuation
from apache_beam.transforms.ptransform import PTransform
@@ -462,10 +463,13 @@ class SDFProcessElementInvoker(object):
checkpoint_state.checkpointed = object()
output_processor.reset()
+ noop_estimator = (
+ NoOpWatermarkEstimatorProvider().create_watermark_estimator(None))
Timer(self._max_duration, initiate_checkpoint).start()
sdf_invoker.invoke_process(
element,
restriction_tracker=tracker,
+ watermark_estimator=noop_estimator,
additional_args=args,
additional_kwargs=kwargs)
@@ -511,7 +515,8 @@ class _OutputProcessor(OutputProcessor):
def __init__(self):
self.output_iter = None
- def process_outputs(self, windowed_input_element, output_iter):
+ def process_outputs(
+ self, windowed_input_element, output_iter, watermark_estimator=None):
# type: (WindowedValue, Iterable[Any]) -> None
self.output_iter = output_iter
@@ -520,5 +525,6 @@ class _OutputProcessor(OutputProcessor):
class _NoneShallPassOutputProcessor(OutputProcessor):
- def process_outputs(self, windowed_input_element, output_iter):
+ def process_outputs(
+ self, windowed_input_element, output_iter, watermark_estimator=None):
raise RuntimeError()