You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:01 UTC

[beam] 01/04: [BEAM-9801] Pass in fire timestamp and pane info to timer callback

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6f1c2f4e81930e0f1f0b482dedc1dbdcca2caac0
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Wed Apr 22 17:08:59 2020 +0200

    [BEAM-9801] Pass in fire timestamp and pane info to timer callback
    
    Pass in the timestamp to avoid:
    
    ```
    INFO:apache_beam.utils.subprocess_server:Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 245, in _execute
    INFO:apache_beam.utils.subprocess_server: response = task()
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda>
    INFO:apache_beam.utils.subprocess_server: lambda: self.create_worker().do_instruction(request), request)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction
    INFO:apache_beam.utils.subprocess_server: getattr(request, request_type), request.instruction_id)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle
    INFO:apache_beam.utils.subprocess_server: bundle_processor.process_bundle(instruction_id))
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 910, in process_bundle
    INFO:apache_beam.utils.subprocess_server: element.timer_family_id, timer_data)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/operations.py", line 688, in process_timer
    INFO:apache_beam.utils.subprocess_server: timer_data.fire_timestamp)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 990, in process_user_timer
    INFO:apache_beam.utils.subprocess_server: self._reraise_augmented(exn)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1043, in _reraise_augmented
    INFO:apache_beam.utils.subprocess_server: raise_with_traceback(new_exn)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 988, in process_user_timer
    INFO:apache_beam.utils.subprocess_server: self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 517, in invoke_user_timer
    INFO:apache_beam.utils.subprocess_server: self.user_state_context, key, window, timestamp))
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/common.py", line 1093, in process_outputs
    INFO:apache_beam.utils.subprocess_server: for result in results:
    INFO:apache_beam.utils.subprocess_server: File "/Users/max/Dev/beam/sdks/python/apache_beam/testing/load_tests/pardo_test.py", line 185, in process_timer
    INFO:apache_beam.utils.subprocess_server: timer.set(0)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/runners/worker/bundle_processor.py", line 589, in set
    INFO:apache_beam.utils.subprocess_server: self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 651, in encode_to_stream
    INFO:apache_beam.utils.subprocess_server: value.hold_timestamp, out, True)
    INFO:apache_beam.utils.subprocess_server: File "apache_beam/coders/coder_impl.py", line 608, in encode_to_stream
    INFO:apache_beam.utils.subprocess_server: millis = value.micros // 1000
    INFO:apache_beam.utils.subprocess_server:AttributeError: 'NoneType' object has no attribute 'micros' [while running 'GenerateLoad']
    ```
---
 sdks/python/apache_beam/runners/common.py                | 16 +++++++++-------
 .../apache_beam/runners/direct/transform_evaluator.py    |  4 +++-
 .../runners/portability/fn_api_runner/execution.py       |  6 ++++++
 .../runners/portability/fn_api_runner/fn_runner.py       |  7 +++++--
 .../runners/portability/fn_api_runner/fn_runner_test.py  | 12 ++++++++++--
 sdks/python/apache_beam/runners/worker/operations.py     |  3 ++-
 6 files changed, 35 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 156340f..cafa4a1 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -221,7 +221,8 @@ class MethodWrapper(object):
     if self.watermark_estimator_provider is None:
       self.watermark_estimator_provider = NoOpWatermarkEstimatorProvider()
 
-  def invoke_timer_callback(self, user_state_context, key, window, timestamp):
+  def invoke_timer_callback(
+      self, user_state_context, key, window, timestamp, pane_info):
     # TODO(ccy): support side inputs.
     kwargs = {}
     if self.has_userstate_arguments:
@@ -229,10 +230,10 @@ class MethodWrapper(object):
         kwargs[kw] = user_state_context.get_state(state_spec, key, window)
       for kw, timer_spec in self.timer_args_to_replace.items():
         kwargs[kw] = user_state_context.get_timer(
-            timer_spec, key, window, None, None)
+            timer_spec, key, window, timestamp, pane_info)
 
     if self.timestamp_arg_name:
-      kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+      kwargs[self.timestamp_arg_name] = Timestamp.of(timestamp)
     if self.window_arg_name:
       kwargs[self.window_arg_name] = window
     if self.key_arg_name:
@@ -509,12 +510,12 @@ class DoFnInvoker(object):
     """
     self.signature.teardown_lifecycle_method.method_value()
 
-  def invoke_user_timer(self, timer_spec, key, window, timestamp):
+  def invoke_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     # self.output_processor is Optional, but in practice it won't be None here
     self.output_processor.process_outputs(
         WindowedValue(None, timestamp, (window, )),
         self.signature.timer_methods[timer_spec].invoke_timer_callback(
-            self.user_state_context, key, window, timestamp))
+            self.user_state_context, key, window, timestamp, pane_info))
 
   def invoke_create_watermark_estimator(self, estimator_state):
     return self.signature.create_watermark_estimator_method.method_value(
@@ -983,9 +984,10 @@ class DoFnRunner:
     assert isinstance(self.do_fn_invoker, PerWindowInvoker)
     return self.do_fn_invoker.current_element_progress()
 
-  def process_user_timer(self, timer_spec, key, window, timestamp):
+  def process_user_timer(self, timer_spec, key, window, timestamp, pane_info):
     try:
-      self.do_fn_invoker.invoke_user_timer(timer_spec, key, window, timestamp)
+      self.do_fn_invoker.invoke_user_timer(
+          timer_spec, key, window, timestamp, pane_info)
     except BaseException as exn:
       self._reraise_augmented(exn)
 
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 474ad0d..e2037d6 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -867,7 +867,9 @@ class _ParDoEvaluator(_TransformEvaluator):
         timer_spec,
         self.key_coder.decode(timer_firing.encoded_key),
         timer_firing.window,
-        timer_firing.timestamp)
+        timer_firing.timestamp,
+        # TODO Add paneinfo to timer_firing in DirectRunner
+        None)
 
   def process_element(self, element):
     self.runner.process(element)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index e62d8a8..b552016 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -115,6 +115,12 @@ class ListBuffer(object):
     self._inputs = []
     self._grouped_output = None
 
+  def reset(self):
+    """Resets a cleared buffer for reuse."""
+    if not self.cleared:
+      raise RuntimeError('Trying to reset a non-cleared ListBuffer.')
+    self.cleared = False
+
 
 class GroupingBuffer(object):
   """Used to accumulate groupded (shuffled) results."""
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 34b605f..d3e378e 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -966,10 +966,13 @@ class BundleManager(object):
           (result_future.is_done() and result_future.get().error)):
         if isinstance(output, beam_fn_api_pb2.Elements.Timer):
           with BundleManager._lock:
-            self._get_buffer(
+            timer_buffer = self._get_buffer(
                 expected_output_timers[(
                     output.transform_id, output.timer_family_id)],
-                output.transform_id).append(output.timers)
+                output.transform_id)
+            if timer_buffer.cleared:
+              timer_buffer.reset()
+            timer_buffer.append(output.timers)
         if isinstance(output, beam_fn_api_pb2.Elements.Data):
           with BundleManager._lock:
             self._get_buffer(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 3bc977b..1d2400a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -327,6 +327,7 @@ class FnApiRunnerTest(unittest.TestCase):
 
   def test_pardo_timers(self):
     timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+    state_spec = userstate.CombiningValueStateSpec('num_called', sum)
 
     class TimerDoFn(beam.DoFn):
       def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
@@ -335,7 +336,14 @@ class FnApiRunnerTest(unittest.TestCase):
         timer.set(2 * ts)
 
       @userstate.on_timer(timer_spec)
-      def process_timer(self):
+      def process_timer(
+          self,
+          ts=beam.DoFn.TimestampParam,
+          timer=beam.DoFn.TimerParam(timer_spec),
+          state=beam.DoFn.StateParam(state_spec)):
+        if state.read() == 0:
+          state.add(1)
+          timer.set(timestamp.Timestamp(micros=2 * ts.micros))
         yield 'fired'
 
     with self.create_pipeline() as p:
@@ -345,7 +353,7 @@ class FnApiRunnerTest(unittest.TestCase):
           | beam.ParDo(TimerDoFn())
           | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
 
-      expected = [('fired', ts) for ts in (20, 200)]
+      expected = [('fired', ts) for ts in (20, 200, 40, 400)]
       assert_that(actual, equal_to(expected))
 
   def test_pardo_timers_clear(self):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index c028c8f..66f12f2 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -685,7 +685,8 @@ class DoOperation(Operation):
         timer_spec,
         timer_data.user_key,
         timer_data.windows[0],
-        timer_data.fire_timestamp)
+        timer_data.fire_timestamp,
+        timer_data.paneinfo)
 
   def finish(self):
     # type: () -> None