You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 18:19:15 UTC

[beam] branch master updated: Tune StreamingModeExecutionContext allocations (#22142)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec47b12cd54 Tune StreamingModeExecutionContext allocations (#22142)
ec47b12cd54 is described below

commit ec47b12cd54bef4632c8a8e0ebca6b88e597c327
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Jul 6 14:19:09 2022 -0400

    Tune StreamingModeExecutionContext allocations (#22142)
---
 .../worker/StreamingModeExecutionContext.java      | 25 ++++++++++------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 65838d7fd44..da18f865500 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -720,6 +720,12 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
       return StreamingModeExecutionContext.this.getSideInputNotifications();
     }
 
+    private void ensureStateful(String errorPrefix) {
+      if (stateFamily == null) {
+        throw new IllegalStateException(errorPrefix + " for stateless step: " + getNameContext());
+      }
+    }
+
     @Override
     public <T, W extends BoundedWindow> void writePCollectionViewData(
         TupleTag<?> tag,
@@ -738,10 +744,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
       ByteString.Output windowStream = ByteString.newOutput();
       windowCoder.encode(window, windowStream, Coder.Context.OUTER);
 
-      if (stateFamily == null) {
-        throw new IllegalStateException(
-            "Tried to write view data for stateless step: " + getNameContext());
-      }
+      ensureStateful("Tried to write view data");
 
       Windmill.GlobalData.Builder builder =
           Windmill.GlobalData.newBuilder()
@@ -768,9 +771,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
     /** Note that there is data on the current key that is blocked on the given side input. */
     @Override
     public void addBlockingSideInput(Windmill.GlobalDataRequest sideInput) {
-      checkState(
-          stateFamily != null,
-          "Tried to set global data request for stateless step: " + getNameContext());
+      ensureStateful("Tried to set global data request");
       sideInput =
           Windmill.GlobalDataRequest.newBuilder(sideInput).setStateFamily(stateFamily).build();
       outputBuilder.addGlobalDataRequests(sideInput);
@@ -787,22 +788,18 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
 
     @Override
     public StateInternals stateInternals() {
-      checkState(
-          stateFamily != null, "Tried to access state for stateless step: " + getNameContext());
+      ensureStateful("Tried to access state");
       return checkNotNull(stateInternals);
     }
 
     @Override
     public TimerInternals timerInternals() {
-      checkState(
-          stateFamily != null, "Tried to access timers for stateless step: " + getNameContext());
+      ensureStateful("Tried to access timers");
       return checkNotNull(systemTimerInternals);
     }
 
     public TimerInternals userTimerInternals() {
-      checkState(
-          stateFamily != null,
-          "Tried to access user timers for stateless step: " + getNameContext());
+      ensureStateful("Tried to access user timers");
       return checkNotNull(userTimerInternals);
     }
   }