You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/06/08 22:10:56 UTC

[beam] branch master updated: BEAM-7475: Update programming guide with stateful and timer example (#8740)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ce63125  BEAM-7475: Update programming guide with stateful and timer example (#8740)
ce63125 is described below

commit ce63125c51c6d2acdbfd3ba4c8db52feec4f0c75
Author: Rakesh Kumar <ra...@lyft.com>
AuthorDate: Sat Jun 8 15:10:32 2019 -0700

    BEAM-7475: Update programming guide with stateful and timer example (#8740)
    
    For python only.
---
 website/src/documentation/programming-guide.md | 67 +++++++++++++++++++++++++-
 1 file changed, 66 insertions(+), 1 deletion(-)

diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md
index 6887130..9509d50 100644
--- a/website/src/documentation/programming-guide.md
+++ b/website/src/documentation/programming-guide.md
@@ -1602,7 +1602,16 @@ are being used, the window is of type `IntervalWindow`.
      public void processElement(@Element String word, IntervalWindow window) {
   }})
 ```
+```py
+import apache_beam as beam
+
+class ProcessRecord(beam.DoFn):
 
+  def process(self, element, window=beam.DoFn.WindowParam):
+     # access window e.g window.end.micros
+     pass  
+  
+```
 {:.language-java}
 **PaneInfo:**
 When triggers are used, Beam provides a `PaneInfo` object that contains information about the current firing. Using `PaneInfo`
@@ -1624,12 +1633,68 @@ The `PipelineOptions` for the current pipeline can always be accessed in a proce
 ```
 
 {:.language-java}
-`@OnTimer` methods can also access many of these parameters. Timestamp, window, `PipelineOptions`, `OutputReceiver`, and
+`@OnTimer` methods can also access many of these parameters. Timestamp, Window, key, `PipelineOptions`, `OutputReceiver`, and
 `MultiOutputReceiver` parameters can all be accessed in an `@OnTimer` method. In addition, an `@OnTimer` method can take
 a parameter of type `TimeDomain` which tells whether the timer is based on event time or processing time.
 Timers are explained in more detail in the
 [Timely (and Stateful) Processing with Apache Beam]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html) blog post.
+```py
+
+class StatefulDoFn(beam.DoFn):
+  """An example stateful DoFn with state and timer"""
+
+  BUFFER_STATE_1 = BagStateSpec('buffer1', beam.BytesCoder())
+  BUFFER_STATE_2 = BagStateSpec('buffer2', beam.VarIntCoder())
+  WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)
+
+  def process(self,
+              element,
+              timestamp=beam.DoFn.TimestampParam,
+              window=beam.DoFn.WindowParam,
+              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
+              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
+              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):
+
+    # Do you processing here
+    key, value = element
+    # Read all the data from buffer1
+    all_values_in_buffer_1 = [x for x in buffer_1.read()]
+
+    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
+        # clear the buffer data if required conditions are met.
+        buffer_1.clear()
+
+    # add the value to buffer 2
+    buffer_2.add(value)
+
+    if StatefulDoFn._all_condition_met():
+      # Clear the timer if certain condition met and you don't want to trigger
+      # the callback method.
+      watermark_timer.clear()
+
+    yield element
+
+  @on_timer(WATERMARK_TIMER)
+  def on_expiry_1(self,
+                  timestamp=beam.DoFn.TimestampParam,
+                  window=beam.DoFn.WindowParam,
+                  key=beam.DoFn.KeyParam,
+                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
+                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
+    # Window and key parameters are really useful especially for debugging issues.
+    yield 'expired1'
+
+  @staticmethod
+  def _all_condition_met():
+      # some logic
+      return True
+
+  @staticmethod
+  def _is_clear_buffer_1_required(buffer_1_data):
+      # Some business logic
+      return True
 
+```
 ### 4.6. Composite transforms {#composite-transforms}
 
 Transforms can have a nested structure, where a complex transform performs