You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2022/09/26 21:06:33 UTC

[beam] branch master updated: Stack Trace Decoration for Beam Samza Runner (#23221)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6851e6bd2ce Stack Trace Decoration for Beam Samza Runner (#23221)
6851e6bd2ce is described below

commit 6851e6bd2cef2670219428a473f95c497dff0d05
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Mon Sep 26 14:06:26 2022 -0700

    Stack Trace Decoration for Beam Samza Runner (#23221)
---
 .../samza/SamzaPipelineExceptionContext.java       | 37 ++++++++++++++++++++++
 .../beam/runners/samza/runtime/OpAdapter.java      | 35 +++++++++++++++++---
 .../translation/FlattenPCollectionsTranslator.java |  3 +-
 .../samza/translation/GroupByKeyTranslator.java    | 20 ++++++------
 .../translation/ParDoBoundMultiTranslator.java     |  8 ++---
 .../translation/SplittableParDoTranslators.java    |  7 ++--
 .../samza/translation/WindowAssignTranslator.java  |  4 +--
 .../samza/util/SamzaPipelineExceptionListener.java | 33 +++++++++++++++++++
 8 files changed, 122 insertions(+), 25 deletions(-)

diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java
new file mode 100644
index 00000000000..5bd02b3fc73
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+/** Helper that is used to metadata associated with an exception thrown by Samza Runner. */
+public class SamzaPipelineExceptionContext {
+  private final String transformFullName;
+  private final Exception exception;
+
+  public SamzaPipelineExceptionContext(String transformFullName, Exception exception) {
+    this.transformFullName = transformFullName;
+    this.exception = exception;
+  }
+
+  public String getTransformFullName() {
+    return transformFullName;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index d04a697b37b..aa090d9b772 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,9 +21,14 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.ServiceLoader;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.translation.TranslationContext;
+import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.samza.config.Config;
@@ -51,20 +56,23 @@ public class OpAdapter<InT, OutT, K>
   private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
 
   private final Op<InT, OutT, K> op;
+  private final String transformFullName;
   private transient List<OpMessage<OutT>> outputList;
   private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
   private transient Instant outputWatermark;
   private transient OpEmitter<OutT> emitter;
   private transient Config config;
   private transient Context context;
+  private transient List<SamzaPipelineExceptionListener.Registrar> exceptionListeners;
 
   public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
-      Op<InT, OutT, K> op) {
-    return new OpAdapter<>(op);
+      Op<InT, OutT, K> op, TranslationContext ctx) {
+    return new OpAdapter<>(op, ctx.getTransformFullName());
   }
 
-  private OpAdapter(Op<InT, OutT, K> op) {
+  private OpAdapter(Op<InT, OutT, K> op, String transformFullName) {
     this.op = op;
+    this.transformFullName = transformFullName;
   }
 
   @Override
@@ -73,6 +81,11 @@ public class OpAdapter<InT, OutT, K>
     this.emitter = new OpEmitterImpl();
     this.config = context.getJobContext().getConfig();
     this.context = context;
+    this.exceptionListeners =
+        StreamSupport.stream(
+                ServiceLoader.load(SamzaPipelineExceptionListener.Registrar.class).spliterator(),
+                false)
+            .collect(Collectors.toList());
   }
 
   @Override
@@ -102,7 +115,8 @@ public class OpAdapter<InT, OutT, K>
               String.format("Unexpected input type: %s", message.getType()));
       }
     } catch (Exception e) {
-      LOG.error("Op {} threw an exception during processing", this.getClass().getName(), e);
+      LOG.error("Exception happened in transform: {}", transformFullName, e);
+      notifyExceptionListeners(transformFullName, e);
       throw UserCodeException.wrap(e);
     }
 
@@ -191,4 +205,17 @@ public class OpAdapter<InT, OutT, K>
       outputList.add(OpMessage.ofSideInput(id, elements));
     }
   }
+
+  private void notifyExceptionListeners(String transformFullName, Exception e) {
+    try {
+      exceptionListeners.forEach(
+          listener -> {
+            listener
+                .getExceptionListener()
+                .onException(new SamzaPipelineExceptionContext(transformFullName, e));
+          });
+    } catch (Exception t) {
+      // ignore exception/interruption by listeners
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index ba1e43965b4..9d529137628 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -61,7 +61,8 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
       // for some of the validateRunner tests only
       final MessageStream<OpMessage<T>> noOpStream =
           ctx.getDummyStream()
-              .flatMapAsync(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
+              .flatMapAsync(
+                  OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}, ctx));
       ctx.registerMessageStream(output, noOpStream);
       return;
     }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 2982d35b806..6a4f3fb5207 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -100,8 +100,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
             windowingStrategy,
             kvInputCoder,
             elementCoder,
-            ctx.getTransformFullName(),
-            ctx.getTransformId(),
+            ctx,
             outputTag,
             input.isBounded());
 
@@ -175,8 +174,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
         windowingStrategy,
         kvInputCoder,
         elementCoder,
-        ctx.getTransformFullName(),
-        ctx.getTransformId(),
+        ctx,
         outputTag,
         isBounded);
   }
@@ -188,8 +186,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
       WindowingStrategy<?, BoundedWindow> windowingStrategy,
       KvCoder<K, InputT> kvInputCoder,
       Coder<WindowedValue<KV<K, InputT>>> elementCoder,
-      String transformFullName,
-      String transformId,
+      TranslationContext ctx,
       TupleTag<KV<K, OutputT>> outputTag,
       PCollection.IsBounded isBounded) {
     final MessageStream<OpMessage<KV<K, InputT>>> filteredInputStream =
@@ -207,7 +204,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
                   KVSerde.of(
                       SamzaCoders.toSerde(kvInputCoder.getKeyCoder()),
                       SamzaCoders.toSerde(elementCoder)),
-                  "gbk-" + escape(transformId))
+                  "gbk-" + escape(ctx.getTransformId()))
               .map(kv -> OpMessage.ofElement(kv.getValue()));
     }
 
@@ -219,7 +216,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
 
     final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
         partitionedInputStream
-            .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+            .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>(), ctx))
             .flatMapAsync(
                 OpAdapter.adapt(
                     new GroupByKeyOp<>(
@@ -228,9 +225,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
                         reduceFn,
                         windowingStrategy,
                         new DoFnOp.SingleOutputManagerFactory<>(),
-                        transformFullName,
-                        transformId,
-                        isBounded)));
+                        ctx.getTransformFullName(),
+                        ctx.getTransformId(),
+                        isBounded),
+                    ctx));
     return outputStream;
   }
 
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index e032a0d8345..6db09f69d7d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -194,7 +194,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
     }
 
     final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
-        mergedStreams.flatMapAsync(OpAdapter.adapt(op));
+        mergedStreams.flatMapAsync(OpAdapter.adapt(op, ctx));
 
     for (int outputIndex : tagToIndexMap.values()) {
       @SuppressWarnings("unchecked")
@@ -204,7 +204,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
                   message ->
                       message.getType() != OpMessage.Type.ELEMENT
                           || message.getElement().getValue().getUnionTag() == outputIndex)
-              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
 
       ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
     }
@@ -345,7 +345,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
     }
 
     final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
-        mergedStreams.flatMapAsync(OpAdapter.adapt(op));
+        mergedStreams.flatMapAsync(OpAdapter.adapt(op, ctx));
 
     for (int outputIndex : tagToIndexMap.values()) {
       @SuppressWarnings("unchecked")
@@ -355,7 +355,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
                   message ->
                       message.getType() != OpMessage.Type.ELEMENT
                           || message.getElement().getValue().getUnionTag() == outputIndex)
-              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+              .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
 
       ctx.registerMessageStream(indexToIdMap.get(outputIndex), outputStream);
     }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
index 60b6bda0d55..0ed7252cf73 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
@@ -119,7 +119,7 @@ public class SplittableParDoTranslators {
 
       final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
           partitionedInputStream
-              .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+              .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>(), ctx))
               .flatMapAsync(
                   OpAdapter.adapt(
                       new SplittableParDoProcessKeyedElementsOp<>(
@@ -129,7 +129,8 @@ public class SplittableParDoTranslators {
                           new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
                           ctx.getTransformFullName(),
                           ctx.getTransformId(),
-                          input.isBounded())));
+                          input.isBounded()),
+                      ctx));
 
       for (int outputIndex : tagToIndexMap.values()) {
         @SuppressWarnings("unchecked")
@@ -139,7 +140,7 @@ public class SplittableParDoTranslators {
                     message ->
                         message.getType() != OpMessage.Type.ELEMENT
                             || message.getElement().getValue().getUnionTag() == outputIndex)
-                .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+                .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
 
         ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
       }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
index f85e842d35a..593b5ee3528 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
@@ -47,7 +47,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
     final MessageStream<OpMessage<T>> inputStream = ctx.getMessageStream(ctx.getInput(transform));
 
     final MessageStream<OpMessage<T>> outputStream =
-        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn), ctx));
 
     ctx.registerMessageStream(output, outputStream);
   }
@@ -73,7 +73,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
     final MessageStream<OpMessage<T>> inputStream = ctx.getOneInputMessageStream(transform);
 
     final MessageStream<OpMessage<T>> outputStream =
-        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+        inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn), ctx));
 
     ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
new file mode 100644
index 00000000000..565aa639d45
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+
+/**
+ * An ExceptionListener following Observer pattern. Any runtime exception caught by {@code
+ * OpAdapter} will be notified to any concrete SamzaPipelineExceptionListener at Runtime
+ */
+public interface SamzaPipelineExceptionListener {
+
+  void onException(SamzaPipelineExceptionContext exceptionContext);
+
+  interface Registrar {
+    SamzaPipelineExceptionListener getExceptionListener();
+  }
+}