You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2022/11/11 00:47:40 UTC
[beam] branch master updated: Wire SamzaPipelineOptions to Exeption listener interface (#24109)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8f8f089a5f5 Wire SamzaPipelineOptions to Exeption listener interface (#24109)
8f8f089a5f5 is described below
commit 8f8f089a5f565f179905984cef3522a0577d9219
Author: Sanil Jain <sn...@linkedin.com>
AuthorDate: Thu Nov 10 16:47:31 2022 -0800
Wire SamzaPipelineOptions to Exeption listener interface (#24109)
---
.../org/apache/beam/runners/samza/runtime/OpAdapter.java | 15 ++++++++++-----
.../samza/util/SamzaPipelineExceptionListener.java | 3 ++-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index c5353b0e235..cbe81a1359e 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.FutureUtils;
import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
@@ -60,6 +61,7 @@ public class OpAdapter<InT, OutT, K>
private final Op<InT, OutT, K> op;
private final String transformFullName;
+ private final transient SamzaPipelineOptions samzaPipelineOptions;
private transient OpEmitter<OutT> emitter;
private transient Config config;
private transient Context context;
@@ -67,12 +69,14 @@ public class OpAdapter<InT, OutT, K>
public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
Op<InT, OutT, K> op, TranslationContext ctx) {
- return new OpAdapter<>(op, ctx.getTransformFullName());
+ return new OpAdapter<>(op, ctx.getTransformFullName(), ctx.getPipelineOptions());
}
- private OpAdapter(Op<InT, OutT, K> op, String transformFullName) {
+ private OpAdapter(
+ Op<InT, OutT, K> op, String transformFullName, SamzaPipelineOptions samzaPipelineOptions) {
this.op = op;
this.transformFullName = transformFullName;
+ this.samzaPipelineOptions = samzaPipelineOptions;
}
@Override
@@ -113,7 +117,7 @@ public class OpAdapter<InT, OutT, K>
}
} catch (Exception e) {
LOG.error("Exception happened in transform: {}", transformFullName, e);
- notifyExceptionListeners(transformFullName, e);
+ notifyExceptionListeners(transformFullName, e, samzaPipelineOptions);
throw UserCodeException.wrap(e);
}
@@ -217,12 +221,13 @@ public class OpAdapter<InT, OutT, K>
}
}
- private void notifyExceptionListeners(String transformFullName, Exception e) {
+ private void notifyExceptionListeners(
+ String transformFullName, Exception e, SamzaPipelineOptions samzaPipelineOptions) {
try {
exceptionListeners.forEach(
listener -> {
listener
- .getExceptionListener()
+ .getExceptionListener(samzaPipelineOptions)
.onException(new SamzaPipelineExceptionContext(transformFullName, e));
});
} catch (Exception t) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
index 565aa639d45..7715b183b5c 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.samza.util;
import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
/**
* An ExceptionListener following Observer pattern. Any runtime exception caught by {@code
@@ -28,6 +29,6 @@ public interface SamzaPipelineExceptionListener {
void onException(SamzaPipelineExceptionContext exceptionContext);
interface Registrar {
- SamzaPipelineExceptionListener getExceptionListener();
+ SamzaPipelineExceptionListener getExceptionListener(SamzaPipelineOptions samzaPipelineOptions);
}
}