You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/05/08 00:01:01 UTC
[beam] branch master updated: Adding Python samples to the Timely
(and stateful) Processing post. (#8448)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9214f Adding Python samples to the Timely (and stateful) Processing post. (#8448)
6d9214f is described below
commit 6d9214f451e5136fca5bb95c23f45e3eea0968bb
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Tue May 7 17:00:49 2019 -0700
Adding Python samples to the Timely (and stateful) Processing post. (#8448)
* Adding Python snippets for Timers
---
website/src/_posts/2017-08-28-timely-processing.md | 102 +++++++++++++++++++--
1 file changed, 92 insertions(+), 10 deletions(-)
diff --git a/website/src/_posts/2017-08-28-timely-processing.md b/website/src/_posts/2017-08-28-timely-processing.md
index a7109e9..4a379bcb 100644
--- a/website/src/_posts/2017-08-28-timely-processing.md
+++ b/website/src/_posts/2017-08-28-timely-processing.md
@@ -198,8 +198,13 @@ new DoFn<Event, EnrichedEvent>() {
```
```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+
+ BUFFER_STATE = BagStateSpec('buffer', EventCoder())
+
+ COUNT_STATE = CombiningValueStateSpec('count',
+ VarIntCoder(),
+ combiners.SumCombineFn())
```
Walking through the code, we have:
@@ -234,7 +239,7 @@ new DoFn<Event, EnrichedEvent>() {
countState.write(count);
bufferState.add(context.element());
- if (count > MAX_BUFFER_SIZE) {
+ if (count >= MAX_BUFFER_SIZE) {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
@@ -248,8 +253,30 @@ new DoFn<Event, EnrichedEvent>() {
```
```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+
+ MAX_BUFFER_SIZE = 500;
+
+ BUFFER_STATE = BagStateSpec('buffer', EventCoder())
+
+ COUNT_STATE = CombiningValueStateSpec('count',
+ VarIntCoder(),
+ combiners.SumCombineFn())
+
+ def process(self, element,
+ buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+ count_state=beam.DoFn.StateParam(COUNT_STATE)):
+
+ buffer_state.add(element)
+
+ count_state.add(1)
+ count = count_state.read()
+
+ if count >= MAX_BUFFER_SIZE:
+ for event in buffer_state.read():
+ yield event
+ count_state.clear()
+ buffer_state.clear()
```
Here is an illustration to accompany the code:
@@ -320,14 +347,39 @@ new DoFn<Event, EnrichedEvent>() {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
+ bufferState.clear();
}
}
}
```
```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+ …
+
+ EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
+
+ def process(self, element,
+ w=beam.DoFn.WindowParam,
+ buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+ count_state=beam.DoFn.StateParam(COUNT_STATE),
+ expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
+
+ expiry_timer.set(w.end + ALLOWED_LATENESS)
+
+ … same logic as above …
+
+ @on_timer(EXPIRY_TIMER)
+ def expiry(self,
+ buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+ count_state=beam.DoFn.StateParam(COUNT_STATE)):
+ events = buffer_state.read()
+
+ for event in events:
+ yield event
+
+ buffer_state.clear()
+ count_state.clear()
```
Let's unpack the pieces of this snippet:
@@ -403,7 +455,7 @@ new DoFn<Event, EnrichedEvent>() {
boolean staleTimerSet = firstNonNull(staleSetState.read(), false);
if (firstNonNull(countState.read(), 0) == 0) {
- staleTimer.offset(MAX_BUFFER_DURATION).setRelative());
+ staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
}
… same processing logic as above …
@@ -428,8 +480,38 @@ new DoFn<Event, EnrichedEvent>() {
```
```py
-# State and timers are not yet supported in Beam's Python SDK.
-# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
+class StatefulBufferingFn(beam.DoFn):
+ …
+
+ STALE_TIMER = TimerSpec('stale', TimeDomain.REAL_TIME)
+
+ MAX_BUFFER_DURATION = 1
+
+ def process(self, element,
+ w=beam.DoFn.WindowParam,
+ buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+ count_state=beam.DoFn.StateParam(COUNT_STATE),
+ expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
+ stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):
+
+ if count_state.read() == 0:
+ # We set an absolute timestamp here (not an offset like in the Java SDK)
+ stale_timer.set(time.time() + StatefulBufferingFn.MAX_BUFFER_DURATION)
+
+ … same logic as above …
+
+ @on_timer(STALE_TIMER)
+ def stale(self,
+ buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
+ count_state=beam.DoFn.StateParam(COUNT_STATE)):
+ events = buffer_state.read()
+
+ for event in events:
+ yield event
+
+ buffer_state.clear()
+ count_state.clear()
+
```
Here is an illustration of the final code: