You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/02/01 02:17:00 UTC

[jira] [Work logged] (BEAM-11545) State & timer for batched RPC calls pattern

     [ https://issues.apache.org/jira/browse/BEAM-11545?focusedWorklogId=545017&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-545017 ]

ASF GitHub Bot logged work on BEAM-11545:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Feb/21 02:16
            Start Date: 01/Feb/21 02:16
    Worklog Time Spent: 10m 
      Work Description: rezarokni commented on a change in pull request #13643:
URL: https://github.com/apache/beam/pull/13643#discussion_r567526773



##########
File path: website/www/site/content/en/documentation/patterns/state-and-timers.md
##########
@@ -0,0 +1,143 @@
+---
+title: "State and timers patterns"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# State and timers for calling external services
+
+Usually, authoring an Apache Beam pipeline can be done with out of the box tools and transforms like _ParDo_'s, _Window_'s and _GroupByKey_'s. However, when you want more tight control, keeping state in an otherwise stateless _DoFn_.
+
+State is kept on a per-key (and in case of streaming pipelines, per-windows basis), and as such, the input to your stateful DoFn needs to be keyed (e.g. by the customer identifier is you're tracking clicks from an e-commerce website).

Review comment:
       Its still per window even in batch ( just happens to be global window most of the time but not always)

##########
File path: website/www/site/content/en/documentation/patterns/state-and-timers.md
##########
@@ -0,0 +1,143 @@
+---
+title: "State and timers patterns"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# State and timers for calling external services
+
+Usually, authoring an Apache Beam pipeline can be done with out of the box tools and transforms like _ParDo_'s, _Window_'s and _GroupByKey_'s. However, when you want more tight control, keeping state in an otherwise stateless _DoFn_.
+
+State is kept on a per-key (and in case of streaming pipelines, per-windows basis), and as such, the input to your stateful DoFn needs to be keyed (e.g. by the customer identifier is you're tracking clicks from an e-commerce website).
+
+Examples of use cases are: assigning unique ID to each element, joining streams of data in 'more exotic' ways, or batching up API calls to external services. In this section we'll go over the last one in particular.
+
+Make sure to check the [docs](https://beam.apache.org/documentation/programming-guide/#state-and-timers) for deeper understanding.
+
+The stateful DoFn we're developing will buffer incoming elements by storing them in a state cell and will output them when the window expires in batches of a given size (e.g. 5000).
+
+This is implemented by constructing a DoFn with:
+
+- a BagStateSpec in Python and a BagState in Java called "buffer" which will be used to write elements to and read elements from.
+- a TimerSpec which will callback at a certain point in time - in this case at the end of the window.
+- the `process`-function will add newly incoming events to the state and set the timer to the end of the window.
+- the `expiry`-function in Python `onTimer`-function in Java will read the state on expiry and output batches of the data ready to do a remote external service in the next transform.
+
+The result in Python:
+
+```python
+class _BatchItems(DoFn):
+    elements_bag = BagStateSpec('buffer', IterableCoder(PickleCoder()))
+    event_timer = TimerSpec('event_timer', TimeDomain.WATERMARK)
+
+    def __init__(self, max_batch_size=5000):
+        self.max_batch_size = max_batch_size
+
+    def process(self,
+                element,
+                window=DoFn.WindowParam,
+                buffer_state=DoFn.StateParam(elements_bag),
+                event_timer=DoFn.TimerParam(event_timer)):
+
+        buffer_state.add(element)
+        event_timer.set(window.max_timestamp())
+
+    @on_timer(event_timer)
+    def expiry(self, buffer_state=DoFn.StateParam(elements_bag), event_timer=DoFn.TimerParam(event_timer)):
+        batch = [element for element in buffer_state.read()]
+        if not batch:
+            return
+
+        key, _ = batch[0]
+        current_batch_size = itertools.count()
+        output = []
+
+        if batch:
+            for event in batch:
+                clear_buffer = next(current_batch_size) >= self.max_batch_size
+                if clear_buffer:
+                    yield (key, output)
+                    output.clear()
+                    current_batch_size = itertools.count()
+                output.append(event[1])
+
+        if len(output) > 0:
+            yield (key, output)
+```
+
+and in Java:
+
+```java
+class BatchRequestForRecommendationAI extends DoFn<KV<String, GenericJson>, KV<String, Iterable<GenericJson>>> {
+    private static final Logger LOG = LoggerFactory.getLogger(BatchRequestForRecommendationAI.class);
+
+    private final Counter numberOfRowsBagged =
+            Metrics.counter(BatchRequestForRecommendationAI.class, "numberOfRowsBagged");
+
+    private final Integer maxBatchSize;
+
+    @StateId("elementsBag")
+    private final StateSpec<BagState<KV<String, GenericJson>>> elementsBag = StateSpecs.bag();
+
+    @TimerId("eventTimer")
+    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    public BatchRequestForRecommendationAI(Integer maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    @ProcessElement
+    public void process(
+            @Element KV<String, GenericJson> element,
+            @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+            @TimerId("eventTimer") Timer eventTimer,
+            BoundedWindow w) {
+        elementsBag.add(element);
+        eventTimer.set(w.maxTimestamp());
+    }
+
+    @OnTimer("eventTimer")
+    public void onTimer(
+            @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+            OutputReceiver<KV<String, Iterable<GenericJson>>> output) {
+        if (elementsBag.read().iterator().hasNext()) {

Review comment:
       Easier to read if you assign to a iterator object before this?

##########
File path: website/www/site/content/en/documentation/patterns/overview.md
##########
@@ -45,6 +45,9 @@ Pipeline patterns demonstrate common Beam use cases. Pipeline patterns are based
 **Schema patterns** - Patterns for using Schemas
 * [Using Joins](/documentation/patterns/schema/#using-joins)
 
+**State & timers patterns** - Patterns for using state & timers
+* [State & timers for throttling calls to external services](/documentation/patterns/state-and-timers/#state-and-timers-for-external-services)

Review comment:
       c/throttle/concurrency control

##########
File path: website/www/site/content/en/documentation/patterns/state-and-timers.md
##########
@@ -0,0 +1,143 @@
+---
+title: "State and timers patterns"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# State and timers for calling external services
+
+Usually, authoring an Apache Beam pipeline can be done with out of the box tools and transforms like _ParDo_'s, _Window_'s and _GroupByKey_'s. However, when you want more tight control, keeping state in an otherwise stateless _DoFn_.
+
+State is kept on a per-key (and in case of streaming pipelines, per-windows basis), and as such, the input to your stateful DoFn needs to be keyed (e.g. by the customer identifier is you're tracking clicks from an e-commerce website).
+
+Examples of use cases are: assigning unique ID to each element, joining streams of data in 'more exotic' ways, or batching up API calls to external services. In this section we'll go over the last one in particular.
+
+Make sure to check the [docs](https://beam.apache.org/documentation/programming-guide/#state-and-timers) for deeper understanding.
+
+The stateful DoFn we're developing will buffer incoming elements by storing them in a state cell and will output them when the window expires in batches of a given size (e.g. 5000).
+
+This is implemented by constructing a DoFn with:
+
+- a BagStateSpec in Python and a BagState in Java called "buffer" which will be used to write elements to and read elements from.
+- a TimerSpec which will callback at a certain point in time - in this case at the end of the window.
+- the `process`-function will add newly incoming events to the state and set the timer to the end of the window.
+- the `expiry`-function in Python `onTimer`-function in Java will read the state on expiry and output batches of the data ready to do a remote external service in the next transform.
+
+The result in Python:
+
+```python
+class _BatchItems(DoFn):
+    elements_bag = BagStateSpec('buffer', IterableCoder(PickleCoder()))
+    event_timer = TimerSpec('event_timer', TimeDomain.WATERMARK)
+
+    def __init__(self, max_batch_size=5000):
+        self.max_batch_size = max_batch_size
+
+    def process(self,
+                element,
+                window=DoFn.WindowParam,
+                buffer_state=DoFn.StateParam(elements_bag),
+                event_timer=DoFn.TimerParam(event_timer)):
+
+        buffer_state.add(element)
+        event_timer.set(window.max_timestamp())
+
+    @on_timer(event_timer)
+    def expiry(self, buffer_state=DoFn.StateParam(elements_bag), event_timer=DoFn.TimerParam(event_timer)):
+        batch = [element for element in buffer_state.read()]
+        if not batch:
+            return
+
+        key, _ = batch[0]
+        current_batch_size = itertools.count()
+        output = []
+
+        if batch:
+            for event in batch:
+                clear_buffer = next(current_batch_size) >= self.max_batch_size
+                if clear_buffer:
+                    yield (key, output)
+                    output.clear()
+                    current_batch_size = itertools.count()
+                output.append(event[1])
+
+        if len(output) > 0:
+            yield (key, output)
+```
+
+and in Java:
+
+```java
+class BatchRequestForRecommendationAI extends DoFn<KV<String, GenericJson>, KV<String, Iterable<GenericJson>>> {
+    private static final Logger LOG = LoggerFactory.getLogger(BatchRequestForRecommendationAI.class);
+
+    private final Counter numberOfRowsBagged =
+            Metrics.counter(BatchRequestForRecommendationAI.class, "numberOfRowsBagged");
+
+    private final Integer maxBatchSize;
+
+    @StateId("elementsBag")
+    private final StateSpec<BagState<KV<String, GenericJson>>> elementsBag = StateSpecs.bag();
+
+    @TimerId("eventTimer")
+    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    public BatchRequestForRecommendationAI(Integer maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    @ProcessElement
+    public void process(
+            @Element KV<String, GenericJson> element,
+            @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+            @TimerId("eventTimer") Timer eventTimer,
+            BoundedWindow w) {
+        elementsBag.add(element);
+        eventTimer.set(w.maxTimestamp());

Review comment:
       What if we are in Global window with process time trigger per element. 
   Also if this is a long window, would we not want output before the end of the window? Maybe this could be something that is configured by the user, so that ever x secs the buffer is checked?
   
   Which would also make the code below also need to optimise so that we dont need to read the whole bag ever time. 

##########
File path: website/www/site/content/en/documentation/patterns/state-and-timers.md
##########
@@ -0,0 +1,143 @@
+---
+title: "State and timers patterns"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# State and timers for calling external services
+
+Usually, authoring an Apache Beam pipeline can be done with out of the box tools and transforms like _ParDo_'s, _Window_'s and _GroupByKey_'s. However, when you want more tight control, keeping state in an otherwise stateless _DoFn_.
+
+State is kept on a per-key (and in case of streaming pipelines, per-windows basis), and as such, the input to your stateful DoFn needs to be keyed (e.g. by the customer identifier is you're tracking clicks from an e-commerce website).
+
+Examples of use cases are: assigning unique ID to each element, joining streams of data in 'more exotic' ways, or batching up API calls to external services. In this section we'll go over the last one in particular.
+
+Make sure to check the [docs](https://beam.apache.org/documentation/programming-guide/#state-and-timers) for deeper understanding.
+
+The stateful DoFn we're developing will buffer incoming elements by storing them in a state cell and will output them when the window expires in batches of a given size (e.g. 5000).
+
+This is implemented by constructing a DoFn with:
+
+- a BagStateSpec in Python and a BagState in Java called "buffer" which will be used to write elements to and read elements from.
+- a TimerSpec which will callback at a certain point in time - in this case at the end of the window.
+- the `process`-function will add newly incoming events to the state and set the timer to the end of the window.
+- the `expiry`-function in Python `onTimer`-function in Java will read the state on expiry and output batches of the data ready to do a remote external service in the next transform.
+
+The result in Python:
+
+```python
+class _BatchItems(DoFn):
+    elements_bag = BagStateSpec('buffer', IterableCoder(PickleCoder()))
+    event_timer = TimerSpec('event_timer', TimeDomain.WATERMARK)
+
+    def __init__(self, max_batch_size=5000):
+        self.max_batch_size = max_batch_size
+
+    def process(self,
+                element,
+                window=DoFn.WindowParam,
+                buffer_state=DoFn.StateParam(elements_bag),
+                event_timer=DoFn.TimerParam(event_timer)):
+
+        buffer_state.add(element)
+        event_timer.set(window.max_timestamp())
+
+    @on_timer(event_timer)
+    def expiry(self, buffer_state=DoFn.StateParam(elements_bag), event_timer=DoFn.TimerParam(event_timer)):
+        batch = [element for element in buffer_state.read()]
+        if not batch:
+            return
+
+        key, _ = batch[0]
+        current_batch_size = itertools.count()
+        output = []
+
+        if batch:
+            for event in batch:
+                clear_buffer = next(current_batch_size) >= self.max_batch_size
+                if clear_buffer:
+                    yield (key, output)
+                    output.clear()
+                    current_batch_size = itertools.count()
+                output.append(event[1])
+
+        if len(output) > 0:
+            yield (key, output)
+```
+
+and in Java:
+
+```java
+class BatchRequestForRecommendationAI extends DoFn<KV<String, GenericJson>, KV<String, Iterable<GenericJson>>> {
+    private static final Logger LOG = LoggerFactory.getLogger(BatchRequestForRecommendationAI.class);
+
+    private final Counter numberOfRowsBagged =
+            Metrics.counter(BatchRequestForRecommendationAI.class, "numberOfRowsBagged");
+
+    private final Integer maxBatchSize;
+
+    @StateId("elementsBag")
+    private final StateSpec<BagState<KV<String, GenericJson>>> elementsBag = StateSpecs.bag();
+
+    @TimerId("eventTimer")
+    private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    public BatchRequestForRecommendationAI(Integer maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    @ProcessElement
+    public void process(
+            @Element KV<String, GenericJson> element,
+            @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+            @TimerId("eventTimer") Timer eventTimer,
+            BoundedWindow w) {
+        elementsBag.add(element);
+        eventTimer.set(w.maxTimestamp());
+    }
+
+    @OnTimer("eventTimer")
+    public void onTimer(
+            @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+            OutputReceiver<KV<String, Iterable<GenericJson>>> output) {
+        if (elementsBag.read().iterator().hasNext()) {
+            String key = elementsBag.read().iterator().next().getKey();
+            AtomicInteger currentBatchSize = new AtomicInteger();

Review comment:
       Why use AtomicInteger here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 545017)
    Time Spent: 40m  (was: 0.5h)

> State & timer for batched RPC calls pattern
> -------------------------------------------
>
>                 Key: BEAM-11545
>                 URL: https://issues.apache.org/jira/browse/BEAM-11545
>             Project: Beam
>          Issue Type: New Feature
>          Components: website
>            Reporter: Matthias Baetens
>            Priority: P3
>              Labels: pipeline-patterns
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Develop a pattern that shows how to leverage State & Timer APIs to batch RPC calls.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)