You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/03/08 19:36:35 UTC

[beam] branch master updated: Improve google cloud dataflow java stream protocol to ensure periodic health checking

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 557cfa6  Improve google cloud dataflow java stream protocol to ensure periodic health checking
     new 8f179ba  Merge pull request #7993: Improve google cloud dataflow java stream protocol to ensure periodic health checking
557cfa6 is described below

commit 557cfa6526f2b4513294a51701b530f3674fe9d4
Author: slavachernyak <ch...@google.com>
AuthorDate: Mon Mar 4 14:49:50 2019 -0800

    Improve google cloud dataflow java stream protocol to ensure periodic health checking
---
 .../runners/dataflow/worker/MetricTrackingWindmillServerStub.java     | 4 +++-
 .../apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java  | 4 +---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index a0daa8e..6c88d58 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -237,13 +237,15 @@ public class MetricTrackingWindmillServerStub {
     activeHeartbeats.set(active.size());
     try {
       if (useStreamingRequests) {
+        // With streaming requests, always send the request even when it is empty, to ensure that
+        // we trigger health checks for the stream even when it is idle.
         GetDataStream stream = streamPool.getStream();
         try {
           stream.refreshActiveWork(active);
         } finally {
           streamPool.releaseStream(stream);
         }
-      } else {
+      } else if (!active.isEmpty()) {
         Windmill.GetDataRequest.Builder builder = Windmill.GetDataRequest.newBuilder();
         for (Map.Entry<String, List<KeyedGetDataRequest>> entry : active.entrySet()) {
           builder.addRequests(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index a6e8df6..38d8349 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1920,9 +1920,7 @@ public class StreamingDataflowWorker {
       active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline));
     }
 
-    if (!active.isEmpty()) {
-      metricTrackingWindmillServer.refreshActiveWork(active);
-    }
+    metricTrackingWindmillServer.refreshActiveWork(active);
   }
 
   /**