You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/08 00:01:01 UTC

[beam] branch master updated: Adding Python samples to the Timely (and stateful) Processing post. (#8448)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9214f  Adding Python samples to the Timely (and stateful) Processing post. (#8448)
6d9214f is described below

commit 6d9214f451e5136fca5bb95c23f45e3eea0968bb
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue May 7 17:00:49 2019 -0700

    Adding Python samples to the Timely (and stateful) Processing post. (#8448)
    
    * Adding Python snippets for Timers
---
 website/src/_posts/2017-08-28-timely-processing.md | 102 +++++++++++++++++++--
 1 file changed, 92 insertions(+), 10 deletions(-)

diff --git a/website/src/_posts/2017-08-28-timely-processing.md b/website/src/_posts/2017-08-28-timely-processing.md
index a7109e9..4a379bcb 100644
--- a/website/src/_posts/2017-08-28-timely-processing.md
+++ b/website/src/_posts/2017-08-28-timely-processing.md
@@ -198,8 +198,13 @@ new DoFn<Event, EnrichedEvent>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+
+  BUFFER_STATE = BagStateSpec('buffer', EventCoder())
+
+  COUNT_STATE = CombiningValueStateSpec('count',
+                                        VarIntCoder(),
+                                        combiners.SumCombineFn())
 ```
 
 Walking through the code, we have:
@@ -234,7 +239,7 @@ new DoFn<Event, EnrichedEvent>() {
     countState.write(count);
     bufferState.add(context.element());
 
-    if (count > MAX_BUFFER_SIZE) {
+    if (count >= MAX_BUFFER_SIZE) {
       for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
         context.output(enrichedEvent);
       }
@@ -248,8 +253,30 @@ new DoFn<Event, EnrichedEvent>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+
+  MAX_BUFFER_SIZE = 500;
+
+  BUFFER_STATE = BagStateSpec('buffer', EventCoder())
+
+  COUNT_STATE = CombiningValueStateSpec('count',
+                                        VarIntCoder(),
+                                        combiners.SumCombineFn())
+
+  def process(self, element,
+              buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+              count_state=beam.DoFn.StateParam(COUNT_STATE)):
+
+    buffer_state.add(element)
+
+    count_state.add(1)
+    count = count_state.read()
+
+    if count >= MAX_BUFFER_SIZE:
+      for event in buffer_state.read():
+        yield event
+      count_state.clear()
+      buffer_state.clear()
 ```
 
 Here is an illustration to accompany the code:
@@ -320,14 +347,39 @@ new DoFn<Event, EnrichedEvent>() {
       for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
         context.output(enrichedEvent);
       }
+      bufferState.clear();
     }
   }
 }
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+  …
+
+  EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
+
+  def process(self, element,
+              w=beam.DoFn.WindowParam,
+              buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+              count_state=beam.DoFn.StateParam(COUNT_STATE),
+              expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
+
+    expiry_timer.set(w.end + ALLOWED_LATENESS)
+
+    … same logic as above …
+
+  @on_timer(EXPIRY_TIMER)
+  def expiry(self,
+             buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+             count_state=beam.DoFn.StateParam(COUNT_STATE)):
+    events = buffer_state.read()
+
+    for event in events:
+      yield event
+
+    buffer_state.clear()
+    count_state.clear()
 ```
 
 Let's unpack the pieces of this snippet:
@@ -403,7 +455,7 @@ new DoFn<Event, EnrichedEvent>() {
 
     boolean staleTimerSet = firstNonNull(staleSetState.read(), false);
     if (firstNonNull(countState.read(), 0) == 0) {
-      staleTimer.offset(MAX_BUFFER_DURATION).setRelative());
+      staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
     }
 
     … same processing logic as above …
@@ -428,8 +480,38 @@ new DoFn<Event, EnrichedEvent>() {
 ```
 
 ```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+  …
+
+  STALE_TIMER = TimerSpec('stale', TimeDomain.REAL_TIME)
+
+  MAX_BUFFER_DURATION = 1
+
+  def process(self, element,
+              w=beam.DoFn.WindowParam,
+              buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+              count_state=beam.DoFn.StateParam(COUNT_STATE),
+              expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
+              stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
+
+    if count_state.read() == 0:
+      # We set an absolute timestamp here (not an offset like in the Java SDK)
+      stale_timer.set(time.time() + StatefulBufferingFn.MAX_BUFFER_DURATION)
+
+    … same logic as above …
+
+  @on_timer(STALE_TIMER)
+  def stale(self,
+            buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+            count_state=beam.DoFn.StateParam(COUNT_STATE)):
+    events = buffer_state.read()
+
+    for event in events:
+      yield event
+
+    buffer_state.clear()
+    count_state.clear()
+
 ```
 
 Here is an illustration of the final code: