You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/05/28 09:29:49 UTC
[beam] branch master updated: [BEAM-7141] Add window & timestamp in
timer callback method argument (#8408)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 882eca8 [BEAM-7141] Add window & timestamp in timer callback method argument (#8408)
882eca8 is described below
commit 882eca84327629ddd8f167c764573ee53a4e7ed8
Author: Rakesh Kumar <ra...@gmail.com>
AuthorDate: Tue May 28 02:29:25 2019 -0700
[BEAM-7141] Add window & timestamp in timer callback method argument (#8408)
---
sdks/python/apache_beam/runners/common.pxd | 2 ++
sdks/python/apache_beam/runners/common.py | 29 ++++++++++++++----
.../apache_beam/transforms/userstate_test.py | 35 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 1d87507..b901c68 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -37,6 +37,8 @@ cdef class MethodWrapper(object):
cdef bint has_userstate_arguments
cdef object state_args_to_replace
cdef object timer_args_to_replace
+ cdef object timestamp_arg_name
+ cdef object window_arg_name
cdef class DoFnSignature(object):
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index cae0d4c..6d821e3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -44,6 +44,7 @@ from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName
+from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
@@ -168,6 +169,9 @@ class MethodWrapper(object):
self.has_userstate_arguments = False
self.state_args_to_replace = {}
self.timer_args_to_replace = {}
+ self.timestamp_arg_name = None
+ self.window_arg_name = None
+
for kw, v in zip(args[-len(defaults):], defaults):
if isinstance(v, core.DoFn.StateParam):
self.state_args_to_replace[kw] = v.state_spec
@@ -175,15 +179,30 @@ class MethodWrapper(object):
elif isinstance(v, core.DoFn.TimerParam):
self.timer_args_to_replace[kw] = v.timer_spec
self.has_userstate_arguments = True
-
- def invoke_timer_callback(self, user_state_context, key, window):
- # TODO(ccy): support WindowParam, TimestampParam and side inputs.
+ elif v == core.DoFn.TimestampParam:
+ self.timestamp_arg_name = kw
+ elif v == core.DoFn.WindowParam:
+ self.window_arg_name = kw
+
+ def invoke_timer_callback(self,
+ user_state_context,
+ key,
+ window,
+ timestamp):
+ # TODO(ccy): support side inputs.
+ kwargs = {}
if self.has_userstate_arguments:
- kwargs = {}
for kw, state_spec in self.state_args_to_replace.items():
kwargs[kw] = user_state_context.get_state(state_spec, key, window)
for kw, timer_spec in self.timer_args_to_replace.items():
kwargs[kw] = user_state_context.get_timer(timer_spec, key, window)
+
+ if self.timestamp_arg_name:
+ kwargs[self.timestamp_arg_name] = Timestamp(seconds=timestamp)
+ if self.window_arg_name:
+ kwargs[self.window_arg_name] = window
+
+ if kwargs:
return self.method_value(**kwargs)
else:
return self.method_value()
@@ -384,7 +403,7 @@ class DoFnInvoker(object):
self.output_processor.process_outputs(
WindowedValue(None, timestamp, (window,)),
self.signature.timer_methods[timer_spec].invoke_timer_callback(
- self.user_state_context, key, window))
+ self.user_state_context, key, window, timestamp))
def invoke_split(self, element, restriction):
return self.signature.split_method.method_value(element, restriction)
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 0a3e13c..53a7e36 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -27,11 +27,14 @@ from apache_beam.coders import BytesCoder
from apache_beam.coders import IterableCoder
from apache_beam.coders import StrUtf8Coder
from apache_beam.coders import VarIntCoder
+from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.common import DoFnSignature
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
from apache_beam.transforms import userstate
+from apache_beam.transforms import window
from apache_beam.transforms.combiners import ToListCombineFn
from apache_beam.transforms.combiners import TopCombineFn
from apache_beam.transforms.core import DoFn
@@ -557,6 +560,38 @@ class StatefulDoFnOnDirectRunnerTest(unittest.TestCase):
[('timer1', 10), ('timer2', 20), ('timer3', 30)],
sorted(StatefulDoFnOnDirectRunnerTest.all_records))
+ def test_timer_output_timestamp_and_window(self):
+
+ class TimerEmittingStatefulDoFn(DoFn):
+ EMIT_TIMER_1 = TimerSpec('emit1', TimeDomain.WATERMARK)
+
+ def process(self, element, timer1=DoFn.TimerParam(EMIT_TIMER_1)):
+ timer1.set(10)
+
+ @on_timer(EMIT_TIMER_1)
+ def emit_callback_1(self,
+ window=DoFn.WindowParam,
+ ts=DoFn.TimestampParam):
+ yield ('timer1', int(ts), int(window.start), int(window.end))
+
+ pipeline_options = PipelineOptions()
+ with TestPipeline(options=pipeline_options) as p:
+ test_stream = (TestStream()
+ .advance_watermark_to(10)
+ .add_elements([1]))
+ (p
+ | test_stream
+ | beam.Map(lambda x: ('mykey', x))
+ | "window_into" >> beam.WindowInto(
+ window.FixedWindows(5),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING)
+ | beam.ParDo(TimerEmittingStatefulDoFn())
+ | beam.ParDo(self.record_dofn()))
+
+ self.assertEqual(
+ [('timer1', 10, 10, 15)],
+ sorted(StatefulDoFnOnDirectRunnerTest.all_records))
+
def test_index_assignment(self):
class IndexAssigningStatefulDoFn(DoFn):
INDEX_STATE = BagStateSpec('index', VarIntCoder())