You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 18:19:15 UTC
[beam] branch master updated: Tune StreamingModeExecutionContext allocations (#22142)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec47b12cd54 Tune StreamingModeExecutionContext allocations (#22142)
ec47b12cd54 is described below
commit ec47b12cd54bef4632c8a8e0ebca6b88e597c327
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Wed Jul 6 14:19:09 2022 -0400
Tune StreamingModeExecutionContext allocations (#22142)
---
.../worker/StreamingModeExecutionContext.java | 25 ++++++++++------------
1 file changed, 11 insertions(+), 14 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 65838d7fd44..da18f865500 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -720,6 +720,12 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
return StreamingModeExecutionContext.this.getSideInputNotifications();
}
+ private void ensureStateful(String errorPrefix) {
+ if (stateFamily == null) {
+ throw new IllegalStateException(errorPrefix + " for stateless step: " + getNameContext());
+ }
+ }
+
@Override
public <T, W extends BoundedWindow> void writePCollectionViewData(
TupleTag<?> tag,
@@ -738,10 +744,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
ByteString.Output windowStream = ByteString.newOutput();
windowCoder.encode(window, windowStream, Coder.Context.OUTER);
- if (stateFamily == null) {
- throw new IllegalStateException(
- "Tried to write view data for stateless step: " + getNameContext());
- }
+ ensureStateful("Tried to write view data");
Windmill.GlobalData.Builder builder =
Windmill.GlobalData.newBuilder()
@@ -768,9 +771,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
/** Note that there is data on the current key that is blocked on the given side input. */
@Override
public void addBlockingSideInput(Windmill.GlobalDataRequest sideInput) {
- checkState(
- stateFamily != null,
- "Tried to set global data request for stateless step: " + getNameContext());
+ ensureStateful("Tried to set global data request");
sideInput =
Windmill.GlobalDataRequest.newBuilder(sideInput).setStateFamily(stateFamily).build();
outputBuilder.addGlobalDataRequests(sideInput);
@@ -787,22 +788,18 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
@Override
public StateInternals stateInternals() {
- checkState(
- stateFamily != null, "Tried to access state for stateless step: " + getNameContext());
+ ensureStateful("Tried to access state");
return checkNotNull(stateInternals);
}
@Override
public TimerInternals timerInternals() {
- checkState(
- stateFamily != null, "Tried to access timers for stateless step: " + getNameContext());
+ ensureStateful("Tried to access timers");
return checkNotNull(systemTimerInternals);
}
public TimerInternals userTimerInternals() {
- checkState(
- stateFamily != null,
- "Tried to access user timers for stateless step: " + getNameContext());
+ ensureStateful("Tried to access user timers");
return checkNotNull(userTimerInternals);
}
}