You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:01 UTC
[beam] 01/04: [BEAM-9801] Pass in fire timestamp and pane info to
timer callback
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6f1c2f4e81930e0f1f0b482dedc1dbdcca2caac0
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Wed Apr 22 17:08:59 2020 +0200
[BEAM-9801] Pass in fire timestamp and pane info to timer callback
Pass in the timestamp to avoid:
```
INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
INFO:apache_beam.utils.subprocess_server: response = task()
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
INFO:apache_beam.utils.subprocess_server: lambda: self.create_worker().do_instruction(request), request)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), request.instruction_id)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
INFO:apache_beam.utils.subprocess_server: bundle_processor.process_bundle(instruction_id))
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/operations.py", line 688, in process_timer
INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 990, in process_user_timer
INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1043, in _reraise_augmented
INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 988, in process_user_timer
INFO:apache_beam.utils.subprocess_server: self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 517, in invoke_user_timer
INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, window, timestamp))
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1093, in process_outputs
INFO:apache_beam.utils.subprocess_server: for result in results:
INFO:apache_beam.utils.subprocess_server: File "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", line 185, in process_timer
INFO:apache_beam.utils.subprocess_server: timer.set(0)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 589, in set
INFO:apache_beam.utils.subprocess_server: self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object has no attribute 'micros' [while running 'GenerateLoad']
```
---
sdks/python/apache_beam/runners/common.py | 16 +++++++++-------
.../apache_beam/runners/direct/transform_evaluator.py | 4 +++-
.../runners/portability/fn_api_runner/execution.py | 6 ++++++
.../runners/portability/fn_api_runner/fn_runner.py | 7 +++++--
.../runners/portability/fn_api_runner/fn_runner_test.py | 12 ++++++++++--
sdks/python/apache_beam/runners/worker/operations.py | 3 ++-
6 files changed, 35 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 156340f..cafa4a1 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -221,7 +221,8 @@ class MethodWrapper(object):
if self.watermark_estimator_provider is None:
self.watermark_estimator_provider = NoOpWatermarkEstimatorProvider()
- def invoke_timer_callback(self, user_state_context, key, window, timestamp):
+ def invoke_timer_callback(
+ self, user_state_context, key, window, timestamp, pane_info):
# TODO(ccy): support side inputs.
kwargs = {}
if self.has_userstate_arguments:
@@ -229,10 +230,10 @@ class MethodWrapper(object):
kwargs[kw] = user_state_context.get_state(state_spec, key, window)
for kw, timer_spec in self.timer_args_to_replace.items():
kwargs[kw] = user_state_context.get_timer(
- timer_spec, key, window, None, None)
+ timer_spec, key, window, timestamp, pane_info)
if self.timestamp_arg_name:
- kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+ kwargs[self.timestamp_arg_name] = Timestamp.of(timestamp)
if self.window_arg_name:
kwargs[self.window_arg_name] = window
if self.key_arg_name:
@@ -509,12 +510,12 @@ class DoFnInvoker(object):
"""
self.signature.teardown_lifecycle_method.method_value()
- def invoke_user_timer(self, timer_spec, key, window, timestamp):
+ def invoke_user_timer(self, timer_spec, key, window, timestamp, pane_info):
# self.output_processor is Optional, but in practice it won't be None here
self.output_processor.process_outputs(
WindowedValue(None, timestamp, (window, )),
self.signature.timer_methods[timer_spec].invoke_timer_callback(
- self.user_state_context, key, window, timestamp))
+ self.user_state_context, key, window, timestamp, pane_info))
def invoke_create_watermark_estimator(self, estimator_state):
return self.signature.create_watermark_estimator_method.method_value(
@@ -983,9 +984,10 @@ class DoFnRunner:
assert isinstance(self.do_fn_invoker, PerWindowInvoker)
return self.do_fn_invoker.current_element_progress()
- def process_user_timer(self, timer_spec, key, window, timestamp):
+ def process_user_timer(self, timer_spec, key, window, timestamp, pane_info):
try:
- self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
+ self.do_fn_invoker.invoke_user_timer(
+ timer_spec, key, window, timestamp, pane_info)
except BaseException as exn:
self._reraise_augmented(exn)
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 474ad0d..e2037d6 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -867,7 +867,9 @@ class _ParDoEvaluator(_TransformEvaluator):
timer_spec,
self.key_coder.decode(timer_firing.encoded_key),
timer_firing.window,
- timer_firing.timestamp)
+ timer_firing.timestamp,
+ # TODO Add paneinfo to timer_firing in DirectRunner
+ None)
def process_element(self, element):
self.runner.process(element)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index e62d8a8..b552016 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -115,6 +115,12 @@ class ListBuffer(object):
self._inputs = []
self._grouped_output = None
+ def reset(self):
+ """Resets a cleared buffer for reuse."""
+ if not self.cleared:
+ raise RuntimeError('Trying to reset a non-cleared ListBuffer.')
+ self.cleared = False
+
class GroupingBuffer(object):
"""Used to accumulate groupded (shuffled) results."""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 34b605f..d3e378e 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -966,10 +966,13 @@ class BundleManager(object):
(result_future.is_done() and result_future.get().error)):
if isinstance(output, beam_fn_api_pb2.Elements.Timer):
with BundleManager._lock:
- self._get_buffer(
+ timer_buffer = self._get_buffer(
expected_output_timers[(
output.transform_id, output.timer_family_id)],
- output.transform_id).append(output.timers)
+ output.transform_id)
+ if timer_buffer.cleared:
+ timer_buffer.reset()
+ timer_buffer.append(output.timers)
if isinstance(output, beam_fn_api_pb2.Elements.Data):
with BundleManager._lock:
self._get_buffer(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 3bc977b..1d2400a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -327,6 +327,7 @@ class FnApiRunnerTest(unittest.TestCase):
def test_pardo_timers(self):
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+ state_spec = userstate.CombiningValueStateSpec('num_called', sum)
class TimerDoFn(beam.DoFn):
def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
@@ -335,7 +336,14 @@ class FnApiRunnerTest(unittest.TestCase):
timer.set(2 * ts)
@userstate.on_timer(timer_spec)
- def process_timer(self):
+ def process_timer(
+ self,
+ ts=beam.DoFn.TimestampParam,
+ timer=beam.DoFn.TimerParam(timer_spec),
+ state=beam.DoFn.StateParam(state_spec)):
+ if state.read() == 0:
+ state.add(1)
+ timer.set(timestamp.Timestamp(micros=2 * ts.micros))
yield 'fired'
with self.create_pipeline() as p:
@@ -345,7 +353,7 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.ParDo(TimerDoFn())
| beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
- expected = [('fired', ts) for ts in (20, 200)]
+ expected = [('fired', ts) for ts in (20, 200, 40, 400)]
assert_that(actual, equal_to(expected))
def test_pardo_timers_clear(self):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index c028c8f..66f12f2 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -685,7 +685,8 @@ class DoOperation(Operation):
timer_spec,
timer_data.user_key,
timer_data.windows[0],
- timer_data.fire_timestamp)
+ timer_data.fire_timestamp,
+ timer_data.paneinfo)
def finish(self):
# type: () -> None