You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/08 22:10:56 UTC
[beam] branch master updated: BEAM-7475: Update programming guide
with stateful and timer example (#8740)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce63125 BEAM-7475: Update programming guide with stateful and timer example (#8740)
ce63125 is described below
commit ce63125c51c6d2acdbfd3ba4c8db52feec4f0c75
Author: Rakesh Kumar <ra...@lyft.com>
AuthorDate: Sat Jun 8 15:10:32 2019 -0700
BEAM-7475: Update programming guide with stateful and timer example (#8740)
For python only.
---
website/src/documentation/programming-guide.md | 67 +++++++++++++++++++++++++-
1 file changed, 66 insertions(+), 1 deletion(-)
diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md
index 6887130..9509d50 100644
--- a/website/src/documentation/programming-guide.md
+++ b/website/src/documentation/programming-guide.md
@@ -1602,7 +1602,16 @@ are being used, the window is of type `IntervalWindow`.
public void processElement(@Element String word, IntervalWindow window) {
}})
```
+```py
+import apache_beam as beam
+
+class ProcessRecord(beam.DoFn):
+ def process(self, element, window=beam.DoFn.WindowParam):
+ # access window e.g window.end.micros
+ pass
+
+```
{:.language-java}
**PaneInfo:**
When triggers are used, Beam provides a `PaneInfo` object that contains information about the current firing. Using `PaneInfo`
@@ -1624,12 +1633,68 @@ The `PipelineOptions` for the current pipeline can always be accessed in a proce
```
{:.language-java}
-`@OnTimer` methods can also access many of these parameters. Timestamp, window, `PipelineOptions`, `OutputReceiver`, and
+`@OnTimer` methods can also access many of these parameters. Timestamp, Window, key, `PipelineOptions`, `OutputReceiver`, and
`MultiOutputReceiver` parameters can all be accessed in an `@OnTimer` method. In addition, an `@OnTimer` method can take
a parameter of type `TimeDomain` which tells whether the timer is based on event time or processing time.
Timers are explained in more detail in the
[Timely (and Stateful) Processing with Apache Beam]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html) blog post.
+```py
+
+class StatefulDoFn(beam.DoFn):
+ """An example stateful DoFn with state and timer"""
+
+ BUFFER_STATE_1 = BagStateSpec('buffer1', beam.BytesCoder())
+ BUFFER_STATE_2 = BagStateSpec('buffer2', beam.VarIntCoder())
+ WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)
+
+ def process(self,
+ element,
+ timestamp=beam.DoFn.TimestampParam,
+ window=beam.DoFn.WindowParam,
+ buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
+ buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
+ watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):
+
+ # Do you processing here
+ key, value = element
+ # Read all the data from buffer1
+ all_values_in_buffer_1 = [x for x in buffer_1.read()]
+
+ if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
+ # clear the buffer data if required conditions are met.
+ buffer_1.clear()
+
+ # add the value to buffer 2
+ buffer_2.add(value)
+
+ if StatefulDoFn._all_condition_met():
+ # Clear the timer if certain condition met and you don't want to trigger
+ # the callback method.
+ watermark_timer.clear()
+
+ yield element
+
+ @on_timer(WATERMARK_TIMER)
+ def on_expiry_1(self,
+ timestamp=beam.DoFn.TimestampParam,
+ window=beam.DoFn.WindowParam,
+ key=beam.DoFn.KeyParam,
+ buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
+ buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
+ # Window and key parameters are really useful especially for debugging issues.
+ yield 'expired1'
+
+ @staticmethod
+ def _all_condition_met():
+ # some logic
+ return True
+
+ @staticmethod
+ def _is_clear_buffer_1_required(buffer_1_data):
+ # Some business logic
+ return True
+```
### 4.6. Composite transforms {#composite-transforms}
Transforms can have a nested structure, where a complex transform performs