You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/05/28 09:29:49 UTC

[beam] branch master updated: [BEAM-7141] Add window & timestamp in timer callback method argument (#8408)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 882eca8  [BEAM-7141] Add window & timestamp in timer callback method argument (#8408)
882eca8 is described below

commit 882eca84327629ddd8f167c764573ee53a4e7ed8
Author: Rakesh Kumar <ra...@gmail.com>
AuthorDate: Tue May 28 02:29:25 2019 -0700

    [BEAM-7141] Add window & timestamp in timer callback method argument (#8408)
---
 sdks/python/apache_beam/runners/common.pxd         |  2 ++
 sdks/python/apache_beam/runners/common.py          | 29 ++++++++++++++----
 .../apache_beam/transforms/userstate_test.py       | 35 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 1d87507..b901c68 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -37,6 +37,8 @@ cdef class MethodWrapper(object):
   cdef bint has_userstate_arguments
   cdef object state_args_to_replace
   cdef object timer_args_to_replace
+  cdef object timestamp_arg_name
+  cdef object window_arg_name
 
 
 cdef class DoFnSignature(object):
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index cae0d4c..6d821e3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -44,6 +44,7 @@ from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 
@@ -168,6 +169,9 @@ class MethodWrapper(object):
     self.has_userstate_arguments = False
     self.state_args_to_replace = {}
     self.timer_args_to_replace = {}
+    self.timestamp_arg_name = None
+    self.window_arg_name = None
+
     for kw, v in zip(args[-len(defaults):], defaults):
       if isinstance(v, core.DoFn.StateParam):
         self.state_args_to_replace[kw] = v.state_spec
@@ -175,15 +179,30 @@ class MethodWrapper(object):
       elif isinstance(v, core.DoFn.TimerParam):
         self.timer_args_to_replace[kw] = v.timer_spec
         self.has_userstate_arguments = True
-
-  def invoke_timer_callback(self, user_state_context, key, window):
-    # TODO(ccy): support WindowParam, TimestampParam and side inputs.
+      elif v == core.DoFn.TimestampParam:
+        self.timestamp_arg_name = kw
+      elif v == core.DoFn.WindowParam:
+        self.window_arg_name = kw
+
+  def invoke_timer_callback(self,
+                            user_state_context,
+                            key,
+                            window,
+                            timestamp):
+    # TODO(ccy): support side inputs.
+    kwargs = {}
     if self.has_userstate_arguments:
-      kwargs = {}
       for kw, state_spec in self.state_args_to_replace.items():
         kwargs[kw] = user_state_context.get_state(state_spec, key, window)
       for kw, timer_spec in self.timer_args_to_replace.items():
         kwargs[kw] = user_state_context.get_timer(timer_spec, key, window)
+
+    if self.timestamp_arg_name:
+      kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+    if self.window_arg_name:
+      kwargs[self.window_arg_name] = window
+
+    if kwargs:
       return self.method_value(**kwargs)
     else:
       return self.method_value()
@@ -384,7 +403,7 @@ class DoFnInvoker(object):
     self.output_processor.process_outputs(
         WindowedValue(None, timestamp, (window,)),
         self.signature.timer_methods[timer_spec].invoke_timer_callback(
-            self.user_state_context, key, window))
+            self.user_state_context, key, window, timestamp))
 
   def invoke_split(self, element, restriction):
     return self.signature.split_method.method_value(element, restriction)
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 0a3e13c..53a7e36 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -27,11 +27,14 @@ from apache_beam.coders import BytesCoder
 from apache_beam.coders import IterableCoder
 from apache_beam.coders import StrUtf8Coder
 from apache_beam.coders import VarIntCoder
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners.common import DoFnSignature
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
 from apache_beam.transforms import userstate
+from apache_beam.transforms import window
 from apache_beam.transforms.combiners import ToListCombineFn
 from apache_beam.transforms.combiners import TopCombineFn
 from apache_beam.transforms.core import DoFn
@@ -557,6 +560,38 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
         [('timer1', 10), ('timer2', 20), ('timer3', 30)],
         sorted(StatefulDoFnOnDirectRunnerTest.all_records))
 
+  def test_timer_output_timestamp_and_window(self):
+
+    class TimerEmittingStatefulDoFn(DoFn):
+      EMIT_TIMER_1 = TimerSpec('emit1', TimeDomain.WATERMARK)
+
+      def process(self, element, timer1=DoFn.TimerParam(EMIT_TIMER_1)):
+        timer1.set(10)
+
+      @on_timer(EMIT_TIMER_1)
+      def emit_callback_1(self,
+                          window=DoFn.WindowParam,
+                          ts=DoFn.TimestampParam):
+        yield ('timer1', int(ts), int(window.start), int(window.end))
+
+    pipeline_options = PipelineOptions()
+    with TestPipeline(options=pipeline_options) as p:
+      test_stream = (TestStream()
+                     .advance_watermark_to(10)
+                     .add_elements([1]))
+      (p
+       | test_stream
+       | beam.Map(lambda x: ('mykey', x))
+       | "window_into" >> beam.WindowInto(
+           window.FixedWindows(5),
+           accumulation_mode=trigger.AccumulationMode.DISCARDING)
+       | beam.ParDo(TimerEmittingStatefulDoFn())
+       | beam.ParDo(self.record_dofn()))
+
+    self.assertEqual(
+        [('timer1', 10, 10, 15)],
+        sorted(StatefulDoFnOnDirectRunnerTest.all_records))
+
   def test_index_assignment(self):
     class IndexAssigningStatefulDoFn(DoFn):
       INDEX_STATE = BagStateSpec('index', VarIntCoder())