You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2024/03/04 10:21:02 UTC

(beam) branch master updated: Add last error time to stream error message (#30476)

This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ad8d530916 Add last error time to stream error message (#30476)
4ad8d530916 is described below

commit 4ad8d530916a590e91d8b091291cfa2eaefec029
Author: Arun Pandian <ar...@gmail.com>
AuthorDate: Mon Mar 4 02:20:52 2024 -0800

    Add last error time to stream error message (#30476)
---
 .../worker/windmill/client/AbstractWindmillStream.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index baafc22e030..028a5c2e1d4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -37,6 +37,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
  * synchronizing on this.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream {
+
   public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular flow-control.
@@ -74,6 +76,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
   private final AtomicLong lastResponseTimeMs;
   private final AtomicInteger errorCount;
   private final AtomicReference<String> lastError;
+  private final AtomicReference<DateTime> lastErrorTime;
   private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
@@ -105,6 +108,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
     this.lastResponseTimeMs = new AtomicLong();
     this.errorCount = new AtomicInteger();
     this.lastError = new AtomicReference<>();
+    this.lastErrorTime = new AtomicReference<>();
     this.sleepUntil = new AtomicLong();
     this.finishLatch = new CountDownLatch(1);
     this.requestObserverSupplier =
@@ -210,7 +214,9 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
   public final void appendSummaryHtml(PrintWriter writer) {
     appendSpecificHtml(writer);
     if (errorCount.get() > 0) {
-      writer.format(", %d errors, last error [ %s ]", errorCount.get(), lastError.get());
+      writer.format(
+          ", %d errors, last error [ %s ] at [%s]",
+          errorCount.get(), lastError.get(), lastErrorTime.get());
     }
     if (clientClosed.get()) {
       writer.write(", client closed");
@@ -250,6 +256,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
   }
 
   private class ResponseObserver implements StreamObserver<ResponseT> {
+
     @Override
     public void onNext(ResponseT response) {
       try {
@@ -285,7 +292,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
           status = ((StatusRuntimeException) t).getStatus();
         }
         String statusError = status == null ? "" : status.toString();
-        lastError.set(statusError);
+        setLastError(statusError);
         if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
           long nowMillis = Instant.now().getMillis();
           String responseDebug;
@@ -325,9 +332,14 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
             "Stream completed successfully but did not complete requested operations, "
                 + "recreating";
         LOG.warn(error);
-        lastError.set(error);
+        setLastError(error);
       }
       executor.execute(AbstractWindmillStream.this::startStream);
     }
   }
+
+  private void setLastError(String error) {
+    lastError.set(error);
+    lastErrorTime.set(DateTime.now());
+  }
 }