You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/03/08 19:36:35 UTC
[beam] branch master updated: Improve google cloud dataflow java
stream protocol to ensure periodic health checking
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 557cfa6 Improve google cloud dataflow java stream protocol to ensure periodic health checking
new 8f179ba Merge pull request #7993: Improve google cloud dataflow java stream protocol to ensure periodic health checking
557cfa6 is described below
commit 557cfa6526f2b4513294a51701b530f3674fe9d4
Author: slavachernyak <ch...@google.com>
AuthorDate: Mon Mar 4 14:49:50 2019 -0800
Improve google cloud dataflow java stream protocol to ensure periodic health checking
---
.../runners/dataflow/worker/MetricTrackingWindmillServerStub.java | 4 +++-
.../apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java | 4 +---
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
index a0daa8e..6c88d58 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
@@ -237,13 +237,15 @@ public class MetricTrackingWindmillServerStub {
activeHeartbeats.set(active.size());
try {
if (useStreamingRequests) {
+ // With streaming requests, always send the request even when it is empty, to ensure that
+ // we trigger health checks for the stream even when it is idle.
GetDataStream stream = streamPool.getStream();
try {
stream.refreshActiveWork(active);
} finally {
streamPool.releaseStream(stream);
}
- } else {
+ } else if (!active.isEmpty()) {
Windmill.GetDataRequest.Builder builder = Windmill.GetDataRequest.newBuilder();
for (Map.Entry<String, List<KeyedGetDataRequest>> entry : active.entrySet()) {
builder.addRequests(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index a6e8df6..38d8349 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1920,9 +1920,7 @@ public class StreamingDataflowWorker {
active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline));
}
- if (!active.isEmpty()) {
- metricTrackingWindmillServer.refreshActiveWork(active);
- }
+ metricTrackingWindmillServer.refreshActiveWork(active);
}
/**