You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sc...@apache.org on 2024/03/04 10:21:02 UTC
(beam) branch master updated: Add last error time to stream error message (#30476)
This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad8d530916 Add last error time to stream error message (#30476)
4ad8d530916 is described below
commit 4ad8d530916a590e91d8b091291cfa2eaefec029
Author: Arun Pandian <ar...@gmail.com>
AuthorDate: Mon Mar 4 02:20:52 2024 -0800
Add last error time to stream error message (#30476)
---
.../worker/windmill/client/AbstractWindmillStream.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index baafc22e030..028a5c2e1d4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -37,6 +37,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
* synchronizing on this.
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream {
+
public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular flow-control.
@@ -74,6 +76,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final AtomicLong lastResponseTimeMs;
private final AtomicInteger errorCount;
private final AtomicReference<String> lastError;
+ private final AtomicReference<DateTime> lastErrorTime;
private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
@@ -105,6 +108,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
this.lastResponseTimeMs = new AtomicLong();
this.errorCount = new AtomicInteger();
this.lastError = new AtomicReference<>();
+ this.lastErrorTime = new AtomicReference<>();
this.sleepUntil = new AtomicLong();
this.finishLatch = new CountDownLatch(1);
this.requestObserverSupplier =
@@ -210,7 +214,9 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
public final void appendSummaryHtml(PrintWriter writer) {
appendSpecificHtml(writer);
if (errorCount.get() > 0) {
- writer.format(", %d errors, last error [ %s ]", errorCount.get(), lastError.get());
+ writer.format(
+ ", %d errors, last error [ %s ] at [%s]",
+ errorCount.get(), lastError.get(), lastErrorTime.get());
}
if (clientClosed.get()) {
writer.write(", client closed");
@@ -250,6 +256,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
}
private class ResponseObserver implements StreamObserver<ResponseT> {
+
@Override
public void onNext(ResponseT response) {
try {
@@ -285,7 +292,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
status = ((StatusRuntimeException) t).getStatus();
}
String statusError = status == null ? "" : status.toString();
- lastError.set(statusError);
+ setLastError(statusError);
if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
long nowMillis = Instant.now().getMillis();
String responseDebug;
@@ -325,9 +332,14 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
"Stream completed successfully but did not complete requested operations, "
+ "recreating";
LOG.warn(error);
- lastError.set(error);
+ setLastError(error);
}
executor.execute(AbstractWindmillStream.this::startStream);
}
}
+
+ private void setLastError(String error) {
+ lastError.set(error);
+ lastErrorTime.set(DateTime.now());
+ }
}