You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2019/01/08 18:15:12 UTC
[beam] branch master updated: Clean up stream timeout handling for
gRPC windmill streams to prevent incorrect overriding in tests
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e4d4047 Clean up stream timeout handling for gRPC windmill streams to prevent incorrect overriding in tests
e4d4047 is described below
commit e4d404787756977db42b692a758b85cb2666ac1b
Author: slavachernyak <ch...@google.com>
AuthorDate: Wed Jan 2 13:27:28 2019 -0800
Clean up stream timeout handling for gRPC windmill streams to prevent incorrect overriding in tests
---
.../dataflow/worker/StreamingDataflowWorker.java | 4 +--
.../worker/windmill/GrpcWindmillServer.java | 41 ++++++++++++++--------
.../worker/windmill/WindmillServerStub.java | 6 ++++
3 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d7a52f3..690603c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -939,9 +939,7 @@ public class StreamingDataflowWorker {
// Reconnect every now and again to enable better load balancing.
// If at any point the server closes the stream, we will reconnect immediately; otherwise
// we half-close the stream after some time and create a new one.
- if (!stream.awaitTermination(3, TimeUnit.MINUTES)) {
- stream.close();
- }
+ stream.closeAfterDefaultTimeout();
} catch (InterruptedException e) {
// Continue processing until !running.get()
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 27036db..3acbd49 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -109,7 +109,11 @@ public class GrpcWindmillServer extends WindmillServerStub {
// If a connection cannot be established, gRPC will fail fast so this deadline can be relatively
// high.
- private static final long DEFAULT_RPC_DEADLINE_SECONDS = 300;
+ private static final long DEFAULT_UNARY_RPC_DEADLINE_SECONDS = 300;
+ private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
+ // Stream clean close seconds must be set lower than the stream deadline seconds.
+ private static final long DEFAULT_STREAM_CLEAN_CLOSE_SECONDS = 180;
+
private static final Duration MIN_BACKOFF = Duration.millis(1);
private static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
// Internal gRPC batch size is 64KB, so pick something slightly smaller to account for other
@@ -126,7 +130,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
private final List<CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1BlockingStub>
syncStubList = new ArrayList<>();
private WindmillApplianceGrpc.WindmillApplianceBlockingStub syncApplianceStub = null;
- private long deadlineSeconds = DEFAULT_RPC_DEADLINE_SECONDS;
+ private long unaryDeadlineSeconds = DEFAULT_UNARY_RPC_DEADLINE_SECONDS;
private ImmutableSet<HostAndPort> endpoints;
private int logEveryNStreamFailures = 20;
private Duration maxBackoff = MAX_BACKOFF;
@@ -200,7 +204,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
private synchronized void initializeLocalHost(int port) throws IOException {
this.logEveryNStreamFailures = 1;
this.maxBackoff = Duration.millis(500);
- this.deadlineSeconds = 10; // For local testing use a short deadline.
+ this.unaryDeadlineSeconds = 10; // For local testing use a short deadline.
Channel channel = localhostChannel(port);
if (streamingEngineEnabled()) {
this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
@@ -414,7 +418,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncStub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.getWork(
request
.toBuilder()
@@ -426,7 +430,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncApplianceStub
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.getWork(request));
}
}
@@ -437,7 +441,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncStub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.getData(
request
.toBuilder()
@@ -448,7 +452,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncApplianceStub
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.getData(request));
}
}
@@ -459,7 +463,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncStub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.commitWork(
request
.toBuilder()
@@ -470,7 +474,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncApplianceStub
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.commitWork(request));
}
}
@@ -505,7 +509,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncApplianceStub
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.getConfig(request));
}
}
@@ -519,7 +523,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
return callWithBackoff(
() ->
syncApplianceStub
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
.reportStats(request));
}
}
@@ -712,6 +716,15 @@ public class GrpcWindmillServer extends WindmillServerStub {
}
@Override
+ public final void closeAfterDefaultTimeout() throws InterruptedException {
+ if (!finishLatch.await(DEFAULT_STREAM_CLEAN_CLOSE_SECONDS, TimeUnit.SECONDS)) {
+ // If the stream did not close due to error in the specified ammount of time, half-close
+ // the stream cleanly.
+ close();
+ }
+ }
+
+ @Override
public final Instant startTime() {
return new Instant(startTimeMs.get());
}
@@ -730,7 +743,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
super(
responseObserver ->
stub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
.getWorkStream(responseObserver));
this.request = request;
this.receiver = receiver;
@@ -897,7 +910,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
super(
responseObserver ->
stub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
.getDataStream(responseObserver));
startStream();
}
@@ -1134,7 +1147,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
super(
responseObserver ->
stub()
- .withDeadlineAfter(deadlineSeconds, TimeUnit.SECONDS)
+ .withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS)
.commitWorkStream(responseObserver));
startStream();
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
index fdd1ae6..efaca37 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java
@@ -110,6 +110,12 @@ public abstract class WindmillServerStub implements StatusDataProvider {
/** Waits for the server to close its end of the connection, with timeout. */
boolean awaitTermination(int time, TimeUnit unit) throws InterruptedException;
+ /**
+ * Cleanly closes the stream after implementation-speficied timeout, unless the stream is
+ * aborted before the timeout is reached.
+ */
+ void closeAfterDefaultTimeout() throws InterruptedException;
+
/** Returns when the stream was opened. */
Instant startTime();
}