You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/08 18:15:12 UTC

[beam] branch master updated: Clean up stream timeout handling for gRPC windmill streams to prevent incorrect overriding in tests

This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d4047  Clean up stream timeout handling for gRPC windmill streams to prevent incorrect overriding in tests
e4d4047 is described below

commit e4d404787756977db42b692a758b85cb2666ac1b
Author: slavachernyak <ch...@google.com>
AuthorDate: Wed Jan 2 13:27:28 2019 -0800

    Clean up stream timeout handling for gRPC windmill streams to prevent incorrect overriding in tests
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 +--
 .../worker/windmill/GrpcWindmillServer.java        | 41 ++++++++++++++--------
 .../worker/windmill/WindmillServerStub.java        |  6 ++++
 3 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d7a52f3..690603c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -939,9 +939,7 @@ public class StreamingDataflowWorker {
         // Reconnect every now and again to enable better load balancing.
         // If at any point the server closes the stream, we will reconnect immediately; otherwise
         // we half-close the stream after some time and create a new one.
-        if (!stream.awaitTermination(3, TimeUnit.MINUTES)) {
-          stream.close();
-        }
+        stream.closeAfterDefaultTimeout();
       } catch (InterruptedException e) {
         // Continue processing until !running.get()
       }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 27036db..3acbd49 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -109,7 +109,11 @@ public class GrpcWindmillServer extends WindmillServerStub {
 
   // If a connection cannot be established, gRPC will fail fast so this deadline can be relatively
   // high.
-  private static final long DEFAULT_RPC_DEADLINE_SECONDS = 300;
+  private static final long DEFAULT_UNARY_RPC_DEADLINE_SECONDS = 300;
+  private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
+  // Stream clean close seconds must be set lower than the stream deadline seconds.
+  private static final long DEFAULT_STREAM_CLEAN_CLOSE_SECONDS = 180;
+
   private static final Duration MIN_BACKOFF = Duration.millis(1);
   private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
   // Internal gRPC batch size is 64KB, so pick something slightly smaller to account for other
@@ -126,7 +130,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
   private final List<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1BlockingStub>
       syncStubList = new ArrayList<>();
   private WindmillApplianceGrpc.WindmillApplianceBlockingStub syncApplianceStub = null;
-  private long deadlineSeconds = DEFAULT_RPC_DEADLINE_SECONDS;
+  private long unaryDeadlineSeconds = DEFAULT_UNARY_RPC_DEADLINE_SECONDS;
   private ImmutableSet<HostAndPort> endpoints;
   private int logEveryNStreamFailures = 20;
   private Duration maxBackoff = MAX_BACKOFF;
@@ -200,7 +204,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
   private synchronized void initializeLocalHost(int port) throws IOException {
     this.logEveryNStreamFailures = 1;
     this.maxBackoff = Duration.millis(500);
-    this.deadlineSeconds = 10; // For local testing use a short deadline.
+    this.unaryDeadlineSeconds = 10; // For local testing use a short deadline.
     Channel channel = localhostChannel(port);
     if (streamingEngineEnabled()) {
       this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
@@ -414,7 +418,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncStub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .getWork(
                       request
                           .toBuilder()
@@ -426,7 +430,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncApplianceStub
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .getWork(request));
     }
   }
@@ -437,7 +441,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncStub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .getData(
                       request
                           .toBuilder()
@@ -448,7 +452,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncApplianceStub
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .getData(request));
     }
   }
@@ -459,7 +463,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncStub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .commitWork(
                       request
                           .toBuilder()
@@ -470,7 +474,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncApplianceStub
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .commitWork(request));
     }
   }
@@ -505,7 +509,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncApplianceStub
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .getConfig(request));
     }
   }
@@ -519,7 +523,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       return callWithBackoff(
           () ->
               syncApplianceStub
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
                   .reportStats(request));
     }
   }
@@ -712,6 +716,15 @@ public class GrpcWindmillServer extends WindmillServerStub {
     }
 
     @Override
+    public final void closeAfterDefaultTimeout() throws InterruptedException {
+      if (!finishLatch.await(DEFAULT_STREAM_CLEAN_CLOSE_SECONDS, TimeUnit.SECONDS)) {
+        // If the stream did not close due to error in the specified ammount of time, half-close
+        // the stream cleanly.
+        close();
+      }
+    }
+
+    @Override
     public final Instant startTime() {
       return new Instant(startTimeMs.get());
     }
@@ -730,7 +743,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       super(
           responseObserver ->
               stub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
                   .getWorkStream(responseObserver));
       this.request = request;
       this.receiver = receiver;
@@ -897,7 +910,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       super(
           responseObserver ->
               stub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
                   .getDataStream(responseObserver));
       startStream();
     }
@@ -1134,7 +1147,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
       super(
           responseObserver ->
               stub()
-                  .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+                  .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
                   .commitWorkStream(responseObserver));
       startStream();
     }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index fdd1ae6..efaca37 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -110,6 +110,12 @@ public abstract class WindmillServerStub implements StatusDataProvider {
     /** Waits for the server to close its end of the connection, with timeout. */
     boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException;
 
+    /**
+     * Cleanly closes the stream after implementation-speficied timeout, unless the stream is
+     * aborted before the timeout is reached.
+     */
+    void closeAfterDefaultTimeout() throws InterruptedException;
+
     /** Returns when the stream was opened. */
     Instant startTime();
   }