You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/12/04 18:35:20 UTC

[beam] branch master updated: Store last error status per stream to improve debugging

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 73752cf  Store last error status per stream to improve debugging
     new 07caf33  Merge pull request #13271 from scwhittle/dataflow_debug
73752cf is described below

commit 73752cfc2387f6eafad0b7f78c7c401051b9c1f9
Author: Sam Whittle <sa...@google.com>
AuthorDate: Thu Nov 5 02:49:30 2020 -0800

    Store last error status per stream to improve debugging
---
 .../dataflow/worker/windmill/GrpcWindmillServer.java       | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index f3f019e..64517aa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -633,6 +634,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
     private final AtomicLong lastSendTimeMs = new AtomicLong();
     private final AtomicLong lastResponseTimeMs = new AtomicLong();
     private final AtomicInteger errorCount = new AtomicInteger();
+    private final AtomicReference<String> lastError = new AtomicReference<>();
     private final BackOff backoff = grpcBackoff();
     private final AtomicLong sleepUntil = new AtomicLong();
     protected final AtomicBoolean clientClosed = new AtomicBoolean();
@@ -724,7 +726,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
     public final void appendSummaryHtml(PrintWriter writer) {
       appendSpecificHtml(writer);
       if (errorCount.get() > 0) {
-        writer.format(", %d errors", errorCount.get());
+        writer.format(", %d errors, last error [ %s ]", errorCount.get(), lastError.get());
       }
       if (clientClosed.get()) {
         writer.write(", client closed");
@@ -779,6 +781,8 @@ public class GrpcWindmillServer extends WindmillServerStub {
           if (t instanceof StatusRuntimeException) {
             status = ((StatusRuntimeException) t).getStatus();
           }
+          String statusError = status.toString();
+          lastError.set(statusError);
           if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
             long nowMillis = Instant.now().getMillis();
             String responseDebug;
@@ -794,7 +798,7 @@ public class GrpcWindmillServer extends WindmillServerStub {
                 AbstractWindmillStream.this.getClass(),
                 errorCount.get(),
                 t.toString(),
-                status,
+                statusError,
                 nowMillis - startTimeMs.get(),
                 responseDebug);
           }
@@ -814,9 +818,11 @@ public class GrpcWindmillServer extends WindmillServerStub {
           }
         } else {
           errorCount.incrementAndGet();
-          LOG.warn(
+          String error =
               "Stream completed successfully but did not complete requested operations, "
-                  + "recreating");
+                  + "recreating";
+          LOG.warn(error);
+          lastError.set(error);
         }
         executor.execute(AbstractWindmillStream.this::startStream);
       }