You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/03/13 19:08:35 UTC

[beam] branch master updated: Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask, if there is one.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7df1551  Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask, if there is one.
     new 5a1dc26  Merge pull request #8033 from drieber/maxWorkItemCommitBytes
7df1551 is described below

commit 7df1551778a58a76f345eca39eb5c70ec4848912
Author: David Rieber <dr...@google.com>
AuthorDate: Mon Mar 11 15:49:21 2019 -0700

    Use the maxWorkItemCommitBytes value returned in the StreamingConfigTask, if there is one.
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 38d8349..cea383d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -695,6 +695,7 @@ public class StreamingDataflowWorker {
     LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
+    LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes);
   }
 
   private Node createPortNode(String predecessorId, String successorId) {
@@ -726,6 +727,9 @@ public class StreamingDataflowWorker {
 
   @VisibleForTesting
   public void setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes) {
+    if (maxWorkItemCommitBytes != this.maxWorkItemCommitBytes) {
+      LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes);
+    }
     this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
   }
 
@@ -1585,12 +1589,20 @@ public class StreamingDataflowWorker {
     if (workItem == null || !workItem.isPresent() || workItem.get() == null) {
       return;
     }
-    setMaxWorkItemCommitBytes(180 << 20);
     StreamingConfigTask config = workItem.get().getStreamingConfigTask();
     Preconditions.checkState(config != null);
     if (config.getUserStepToStateFamilyNameMap() != null) {
       stateNameMap.putAll(config.getUserStepToStateFamilyNameMap());
     }
+    if (computation == null) {
+      if (config.getMaxWorkItemCommitBytes() != null
+          && config.getMaxWorkItemCommitBytes() > 0
+          && config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) {
+        setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
+      } else {
+        setMaxWorkItemCommitBytes(180 << 20);
+      }
+    }
     List<StreamingComputationConfig> configs = config.getStreamingComputationConfigs();
     if (configs != null) {
       for (StreamingComputationConfig computationConfig : configs) {