You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2022/11/11 00:47:40 UTC

[beam] branch master updated: Wire SamzaPipelineOptions to Exeption listener interface (#24109)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f8f089a5f5 Wire SamzaPipelineOptions to Exeption listener interface (#24109)
8f8f089a5f5 is described below

commit 8f8f089a5f565f179905984cef3522a0577d9219
Author: Sanil Jain <sn...@linkedin.com>
AuthorDate: Thu Nov 10 16:47:31 2022 -0800

    Wire SamzaPipelineOptions to Exeption listener interface (#24109)
---
 .../org/apache/beam/runners/samza/runtime/OpAdapter.java  | 15 ++++++++++-----
 .../samza/util/SamzaPipelineExceptionListener.java        |  3 ++-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index c5353b0e235..cbe81a1359e 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.translation.TranslationContext;
 import org.apache.beam.runners.samza.util.FutureUtils;
 import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
@@ -60,6 +61,7 @@ public class OpAdapter<InT, OutT, K>
 
   private final Op<InT, OutT, K> op;
   private final String transformFullName;
+  private final transient SamzaPipelineOptions samzaPipelineOptions;
   private transient OpEmitter<OutT> emitter;
   private transient Config config;
   private transient Context context;
@@ -67,12 +69,14 @@ public class OpAdapter<InT, OutT, K>
 
   public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
       Op<InT, OutT, K> op, TranslationContext ctx) {
-    return new OpAdapter<>(op, ctx.getTransformFullName());
+    return new OpAdapter<>(op, ctx.getTransformFullName(), ctx.getPipelineOptions());
   }
 
-  private OpAdapter(Op<InT, OutT, K> op, String transformFullName) {
+  private OpAdapter(
+      Op<InT, OutT, K> op, String transformFullName, SamzaPipelineOptions samzaPipelineOptions) {
     this.op = op;
     this.transformFullName = transformFullName;
+    this.samzaPipelineOptions = samzaPipelineOptions;
   }
 
   @Override
@@ -113,7 +117,7 @@ public class OpAdapter<InT, OutT, K>
       }
     } catch (Exception e) {
       LOG.error("Exception happened in transform: {}", transformFullName, e);
-      notifyExceptionListeners(transformFullName, e);
+      notifyExceptionListeners(transformFullName, e, samzaPipelineOptions);
       throw UserCodeException.wrap(e);
     }
 
@@ -217,12 +221,13 @@ public class OpAdapter<InT, OutT, K>
     }
   }
 
-  private void notifyExceptionListeners(String transformFullName, Exception e) {
+  private void notifyExceptionListeners(
+      String transformFullName, Exception e, SamzaPipelineOptions samzaPipelineOptions) {
     try {
       exceptionListeners.forEach(
           listener -> {
             listener
-                .getExceptionListener()
+                .getExceptionListener(samzaPipelineOptions)
                 .onException(new SamzaPipelineExceptionContext(transformFullName, e));
           });
     } catch (Exception t) {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
index 565aa639d45..7715b183b5c 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.samza.util;
 
 import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
 
 /**
  * An ExceptionListener following Observer pattern. Any runtime exception caught by {@code
@@ -28,6 +29,6 @@ public interface SamzaPipelineExceptionListener {
   void onException(SamzaPipelineExceptionContext exceptionContext);
 
   interface Registrar {
-    SamzaPipelineExceptionListener getExceptionListener();
+    SamzaPipelineExceptionListener getExceptionListener(SamzaPipelineOptions samzaPipelineOptions);
   }
 }