You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/06/05 18:06:10 UTC

[beam] branch master updated: [BEAM-10112] Add state and timer python examples to website

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fef8cfe  [BEAM-10112] Add state and timer python examples to website
     new d4ad5d4  Merge pull request #11882 from y1chi/BEAM-10112
fef8cfe is described below

commit fef8cfe01ce5b0053de5e8b8dbe4b78886191c2f
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Mon Jun 1 13:33:09 2020 -0700

    [BEAM-10112] Add state and timer python examples to website
---
 .../content/en/documentation/programming-guide.md  | 136 ++++++++++++++++++++-
 1 file changed, 132 insertions(+), 4 deletions(-)

diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 82bd053..ae3d955 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -4337,7 +4337,7 @@ written for that key. State is always fully scoped only to the current processin
 
 Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This
 means that the first time a key is seen for a given window any state reads will return empty, and that a runner can
-garbage collect state when a window is completed. It's also often useful to use Beam's windowed aggegations prior to
+garbage collect state when a window is completed. It's also often useful to use Beam's windowed aggregations prior to
 the stateful operator. For example, using a combiner to preaggregate data, and then storing aggregated data inside of
 state. Merging windows are not currently supported when using state and timers.
 
@@ -4346,9 +4346,17 @@ care must be taken to remember that the elements in input PCollection have no gu
 program logic is resilient to this. Unit tests written using the DirectRunner will shuffle the order of element
 processing, and are recommended to test for correctness.
 
+{{< paragraph class="language-java" >}}
 In Java DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each
 state must be named using the `StateId` annotation; this name is unique to a ParDo in the graph and has no relation
 to other nodes in the graph. A `DoFn` can declare multiple state variables.
+{{< /paragraph >}}
+
+{{< paragraph class="language-py" >}}
+In Python DoFn declares states to be accessed by creating `StateSpec` class member variables representing each state. Each 
+`StateSpec` is initialized with a name, this name is unique to a ParDo in the graph and has no relation
+to other nodes in the graph. A `DoFn` can declare multiple state variables.
+{{< /paragraph >}}
 
 ### 11.1 Types of state {#types-of-state}
 
@@ -4404,6 +4412,17 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class CombiningStateDoFn(DoFn):
+  SUM_TOTAL = CombiningValueStateSpec('total', sum)
+  
+  def process(self, element, state=SoFn.StateParam(SUM_TOTAL)):
+    state.add(1)
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Combine state pardo' >> beam.ParDo(CombiningStateDofn()))
+{{< /highlight >}}
+
 #### BagState
 
 A common use case for state is to accumulate multiple elements. `BagState` allows for accumulating an unordered set
@@ -4431,6 +4450,21 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class BagStateDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  
+  def process(self, element_pair, state=DoFn.StateParam(ALL_ELEMENTS)):
+    state.add(element_pair[1])
+    if should_fetch():
+      all_elements = list(state.read())
+      process_values(all_elements)
+      state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Bag state pardo' >> beam.ParDo(BagStateDoFn()))
+{{< /highlight >}}
+
 ### 11.2 Deferred state reads {#deferred-state-reads}
 
 When a `DoFn` contains multiple state specifications, reading each one in order can be slow. Calling the `read()` function
@@ -4478,7 +4512,7 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
     }
    
     // The runner can now batch all three states into a single read, reducing latency.
-     processState1(state1.read());
+    processState1(state1.read());
     processState2(state2.read());
     processState3(state3.read());
   }
@@ -4520,6 +4554,28 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class EventTimerDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+  
+  def process(self, 
+              element_pair, 
+              t = DoFn.TimestampParam,
+              buffer = DoFn.StateParam(ALL_ELEMENTS), 
+              timer = DoFn.TimerParam(TIMER)):
+    buffer.add(element_pair[1])
+    # Set an event-time timer to the element timestamp.
+    timer.set(t)
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'EventTime timer pardo' >> beam.ParDo(EventTimerDoFn()))
+{{< /highlight >}}
+
 #### 11.3.2 Processing-time timers {#processing-time-timers}
 
 Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data
@@ -4527,7 +4583,7 @@ before processing. It can also be used to schedule events that should occur at a
 event-time timers, processing-time timers are per key - each key has a separate copy of the timer.
 
 While processing-time timers can be set to an absolute timestamp, it is very common to set them to an offset relative 
-to the current time. The `Timer.offset` and `Timer.setRelative` methods can be used to accomplish this.
+to the current time. In Java, the `Timer.offset` and `Timer.setRelative` methods can be used to accomplish this.
 
 {{< highlight java >}}
 PCollection<KV<String, ValueT>> perUser = readPerUser();
@@ -4546,6 +4602,28 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class ProcessingTimerDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  TIMER = TimerSpec('timer', TimeDomain.REAL_TIME)
+  
+  def process(self, 
+              element_pair, 
+              buffer = DoFn.StateParam(ALL_ELEMENTS), 
+              timer = DoFn.TimerParam(TIMER)):
+    buffer.add(element_pair[1])
+    # Set a timer to go off 30 seconds in the future.
+    timer.set(Timestamp.now() + Duration(seconds=30))
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    # Process timer.
+    state.clear()
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'ProcessingTime timer pardo' >> beam.ParDo(ProcessingTimerDoFn()))
+{{< /highlight >}}
+
 #### 11.3.3 Dynamic timer tags {#dynamic-timer-tags}
 
 Beam also supports dynamically setting a timer tag using `TimerMap`. This allows for setting multiple different timers
@@ -4573,6 +4651,9 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
 }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+To be supported, See BEAM-9602
+{{< /highlight >}}
 #### 11.3.4 Timer output timestamps {#timer-output-timestamps}
 
 By default, event-time timers will hold the output watermark of the `ParDo` to the timestamp of the timer. This means
@@ -4684,7 +4765,7 @@ performance. There are two common strategies for garbage collecting state.
 All state and timers for a key is scoped to the window it is in. This means that depending on the timestamp of the 
 input element the ParDo will see different values for the state depending on the window that element falls into. In
 addition, once the input watermark passes the end of the window, the runner should garbage collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window, the runner must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window, the runner must wait for the watermark to
 pass the end of the window plus the allowed lateness before garbage collecting state). This can be used as a 
 garbage-collection strategy.
 
@@ -4704,6 +4785,20 @@ perUser.apply(Window.into(CalendarWindows.days(1)
          }));
 {{< /highlight >}}
 
+{{< highlight python >}}
+class StateDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('buffer', coders.VarIntCoder())
+  
+  def process(self, 
+              element_pair, 
+              buffer = DoFn.StateParam(ALL_ELEMENTS)):
+    ...
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'Windowing' >> beam.WindowInto(FixedWindows(60 * 60 * 24))
+       | 'DoFn' >> beam.ParDo(StateDoFn()))
+{{< /highlight >}}
+
 This `ParDo` stores state per day. Once the pipeline is done processing data for a given day, all the state for that
 day is garbage collected.
 
@@ -4751,6 +4846,39 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, OutputT>() {
  }
 {{< /highlight >}}
 
+{{< highlight python >}}
+class UserDoFn(DoFn):
+  ALL_ELEMENTS = BagStateSpec('state', coders.VarIntCoder())
+  MAX_TIMESTAMP = CombiningValueStateSpec('max_timestamp_seen', max)
+  TIMER = TimerSpec('gc-timer', TimeDomain.WATERMARK)
+  
+  def process(self, 
+              element, 
+              t = DoFn.TimestampParam,
+              state = DoFn.StateParam(ALL_ELEMENTS), 
+              max_timestamp = DoFn.StateParam(MAX_TIMESTAMP),
+              timer = DoFn.TimerParam(TIMER)):
+    update_state(state, element)
+    max_timestamp.add(t.micros)
+    
+    # Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so 
+    # as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's
+    # worth of event time (as measured by the watermark), then the gc timer will fire.
+    expiration_time = Timestamp(micros=max_timestamp.read()) + Duration(seconds=60*60)
+    timer.set(expiration_time)
+  
+  @on_timer(TIMER)
+  def expiry_callback(self, 
+                      state = DoFn.StateParam(ALL_ELEMENTS),
+                      max_timestamp = DoFn.StateParam(MAX_TIMESTAMP)):
+    state.clear()
+    max_timestamp.clear()
+  
+    
+_ = (p | 'Read per user' >> ReadPerUser()
+       | 'User DoFn' >> beam.ParDo(UserDoFn()))
+{{< /highlight >}}
+
 ### 11.5 State and timers examples {#state-timers-examples}
 
 Following are some example uses of state and timers