You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/12/04 18:35:20 UTC
[beam] branch master updated: Store last error status per stream to
improve debugging
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 73752cf Store last error status per stream to improve debugging
new 07caf33 Merge pull request #13271 from scwhittle/dataflow_debug
73752cf is described below
commit 73752cfc2387f6eafad0b7f78c7c401051b9c1f9
Author: Sam Whittle <sa...@google.com>
AuthorDate: Thu Nov 5 02:49:30 2020 -0800
Store last error status per stream to improve debugging
---
.../dataflow/worker/windmill/GrpcWindmillServer.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index f3f019e..64517aa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -633,6 +634,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
private final AtomicLong lastSendTimeMs = new AtomicLong();
private final AtomicLong lastResponseTimeMs = new AtomicLong();
private final AtomicInteger errorCount = new AtomicInteger();
+ private final AtomicReference<String> lastError = new AtomicReference<>();
private final BackOff backoff = grpcBackoff();
private final AtomicLong sleepUntil = new AtomicLong();
protected final AtomicBoolean clientClosed = new AtomicBoolean();
@@ -724,7 +726,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
if (errorCount.get() > 0) {
- writer.format(", %d errors", errorCount.get());
+ writer.format(", %d errors, last error [ %s ]", errorCount.get(), lastError.get());
}
if (clientClosed.get()) {
writer.write(", client closed");
@@ -779,6 +781,8 @@ public class GrpcWindmillServer extends WindmillServerStub {
if (t instanceof StatusRuntimeException) {
status = ((StatusRuntimeException) t).getStatus();
}
+ String statusError = status.toString();
+ lastError.set(statusError);
if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
long nowMillis = Instant.now().getMillis();
String responseDebug;
@@ -794,7 +798,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
AbstractWindmillStream.this.getClass(),
errorCount.get(),
t.toString(),
- status,
+ statusError,
nowMillis - startTimeMs.get(),
responseDebug);
}
@@ -814,9 +818,11 @@ public class GrpcWindmillServer extends WindmillServerStub {
}
} else {
errorCount.incrementAndGet();
- LOG.warn(
+ String error =
"Stream completed successfully but did not complete requested operations, "
- + "recreating");
+ + "recreating";
+ LOG.warn(error);
+ lastError.set(error);
}
executor.execute(AbstractWindmillStream.this::startStream);
}