You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/03/13 19:08:35 UTC
[beam] branch master updated: Use the maxWorkItemCommitBytes value
returned in the StreamingConfigTask, if there is one.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7df1551 Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask, if there is one.
new 5a1dc26 Merge pull request #8033 from drieber/maxWorkItemCommitBytes
7df1551 is described below
commit 7df1551778a58a76f345eca39eb5c70ec4848912
Author: David Rieber <dr...@google.com>
AuthorDate: Mon Mar 11 15:49:21 2019 -0700
Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask, if there is one.
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 38d8349..cea383d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -695,6 +695,7 @@ public class StreamingDataflowWorker {
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
+ LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
}
private Node createPortNode(String predecessorId, String successorId) {
@@ -726,6 +727,9 @@ public class StreamingDataflowWorker {
@VisibleForTesting
public void setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes) {
+ if (maxWorkItemCommitBytes != this.maxWorkItemCommitBytes) {
+ LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
+ }
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
}
@@ -1585,12 +1589,20 @@ public class StreamingDataflowWorker {
if (workItem == null || !workItem.isPresent() || workItem.get() == null) {
return;
}
- setMaxWorkItemCommitBytes(180 << 20);
StreamingConfigTask config = workItem.get().getStreamingConfigTask();
Preconditions.checkState(config != null);
if (config.getUserStepToStateFamilyNameMap() != null) {
stateNameMap.putAll(config.getUserStepToStateFamilyNameMap());
}
+ if (computation == null) {
+ if (config.getMaxWorkItemCommitBytes() != null
+ && config.getMaxWorkItemCommitBytes() > 0
+ && config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) {
+ setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
+ } else {
+ setMaxWorkItemCommitBytes(180 << 20);
+ }
+ }
List<StreamingComputationConfig> configs = config.getStreamingComputationConfigs();
if (configs != null) {
for (StreamingComputationConfig computationConfig : configs) {