You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2022/09/26 21:06:33 UTC
[beam] branch master updated: Stack Trace Decoration for Beam Samza Runner (#23221)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6851e6bd2ce Stack Trace Decoration for Beam Samza Runner (#23221)
6851e6bd2ce is described below
commit 6851e6bd2cef2670219428a473f95c497dff0d05
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Mon Sep 26 14:06:26 2022 -0700
Stack Trace Decoration for Beam Samza Runner (#23221)
---
.../samza/SamzaPipelineExceptionContext.java | 37 ++++++++++++++++++++++
.../beam/runners/samza/runtime/OpAdapter.java | 35 +++++++++++++++++---
.../translation/FlattenPCollectionsTranslator.java | 3 +-
.../samza/translation/GroupByKeyTranslator.java | 20 ++++++------
.../translation/ParDoBoundMultiTranslator.java | 8 ++---
.../translation/SplittableParDoTranslators.java | 7 ++--
.../samza/translation/WindowAssignTranslator.java | 4 +--
.../samza/util/SamzaPipelineExceptionListener.java | 33 +++++++++++++++++++
8 files changed, 122 insertions(+), 25 deletions(-)
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java
new file mode 100644
index 00000000000..5bd02b3fc73
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineExceptionContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+/** Helper that is used to metadata associated with an exception thrown by Samza Runner. */
+public class SamzaPipelineExceptionContext {
+ private final String transformFullName;
+ private final Exception exception;
+
+ public SamzaPipelineExceptionContext(String transformFullName, Exception exception) {
+ this.transformFullName = transformFullName;
+ this.exception = exception;
+ }
+
+ public String getTransformFullName() {
+ return transformFullName;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index d04a697b37b..aa090d9b772 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -21,9 +21,14 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+import org.apache.beam.runners.samza.translation.TranslationContext;
+import org.apache.beam.runners.samza.util.SamzaPipelineExceptionListener;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.config.Config;
@@ -51,20 +56,23 @@ public class OpAdapter<InT, OutT, K>
private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
private final Op<InT, OutT, K> op;
+ private final String transformFullName;
private transient List<OpMessage<OutT>> outputList;
private transient CompletionStage<Collection<OpMessage<OutT>>> outputFuture;
private transient Instant outputWatermark;
private transient OpEmitter<OutT> emitter;
private transient Config config;
private transient Context context;
+ private transient List<SamzaPipelineExceptionListener.Registrar> exceptionListeners;
public static <InT, OutT, K> AsyncFlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
- Op<InT, OutT, K> op) {
- return new OpAdapter<>(op);
+ Op<InT, OutT, K> op, TranslationContext ctx) {
+ return new OpAdapter<>(op, ctx.getTransformFullName());
}
- private OpAdapter(Op<InT, OutT, K> op) {
+ private OpAdapter(Op<InT, OutT, K> op, String transformFullName) {
this.op = op;
+ this.transformFullName = transformFullName;
}
@Override
@@ -73,6 +81,11 @@ public class OpAdapter<InT, OutT, K>
this.emitter = new OpEmitterImpl();
this.config = context.getJobContext().getConfig();
this.context = context;
+ this.exceptionListeners =
+ StreamSupport.stream(
+ ServiceLoader.load(SamzaPipelineExceptionListener.Registrar.class).spliterator(),
+ false)
+ .collect(Collectors.toList());
}
@Override
@@ -102,7 +115,8 @@ public class OpAdapter<InT, OutT, K>
String.format("Unexpected input type: %s", message.getType()));
}
} catch (Exception e) {
- LOG.error("Op {} threw an exception during processing", this.getClass().getName(), e);
+ LOG.error("Exception happened in transform: {}", transformFullName, e);
+ notifyExceptionListeners(transformFullName, e);
throw UserCodeException.wrap(e);
}
@@ -191,4 +205,17 @@ public class OpAdapter<InT, OutT, K>
outputList.add(OpMessage.ofSideInput(id, elements));
}
}
+
+ private void notifyExceptionListeners(String transformFullName, Exception e) {
+ try {
+ exceptionListeners.forEach(
+ listener -> {
+ listener
+ .getExceptionListener()
+ .onException(new SamzaPipelineExceptionContext(transformFullName, e));
+ });
+ } catch (Exception t) {
+ // ignore exception/interruption by listeners
+ }
+ }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index ba1e43965b4..9d529137628 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -61,7 +61,8 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
// for some of the validateRunner tests only
final MessageStream<OpMessage<T>> noOpStream =
ctx.getDummyStream()
- .flatMapAsync(OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}));
+ .flatMapAsync(
+ OpAdapter.adapt((Op<String, T, Void>) (inputElement, emitter) -> {}, ctx));
ctx.registerMessageStream(output, noOpStream);
return;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 2982d35b806..6a4f3fb5207 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -100,8 +100,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
windowingStrategy,
kvInputCoder,
elementCoder,
- ctx.getTransformFullName(),
- ctx.getTransformId(),
+ ctx,
outputTag,
input.isBounded());
@@ -175,8 +174,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
windowingStrategy,
kvInputCoder,
elementCoder,
- ctx.getTransformFullName(),
- ctx.getTransformId(),
+ ctx,
outputTag,
isBounded);
}
@@ -188,8 +186,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
WindowingStrategy<?, BoundedWindow> windowingStrategy,
KvCoder<K, InputT> kvInputCoder,
Coder<WindowedValue<KV<K, InputT>>> elementCoder,
- String transformFullName,
- String transformId,
+ TranslationContext ctx,
TupleTag<KV<K, OutputT>> outputTag,
PCollection.IsBounded isBounded) {
final MessageStream<OpMessage<KV<K, InputT>>> filteredInputStream =
@@ -207,7 +204,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
KVSerde.of(
SamzaCoders.toSerde(kvInputCoder.getKeyCoder()),
SamzaCoders.toSerde(elementCoder)),
- "gbk-" + escape(transformId))
+ "gbk-" + escape(ctx.getTransformId()))
.map(kv -> OpMessage.ofElement(kv.getValue()));
}
@@ -219,7 +216,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
final MessageStream<OpMessage<KV<K, OutputT>>> outputStream =
partitionedInputStream
- .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+ .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>(), ctx))
.flatMapAsync(
OpAdapter.adapt(
new GroupByKeyOp<>(
@@ -228,9 +225,10 @@ class GroupByKeyTranslator<K, InputT, OutputT>
reduceFn,
windowingStrategy,
new DoFnOp.SingleOutputManagerFactory<>(),
- transformFullName,
- transformId,
- isBounded)));
+ ctx.getTransformFullName(),
+ ctx.getTransformId(),
+ isBounded),
+ ctx));
return outputStream;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index e032a0d8345..6db09f69d7d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -194,7 +194,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
}
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
- mergedStreams.flatMapAsync(OpAdapter.adapt(op));
+ mergedStreams.flatMapAsync(OpAdapter.adapt(op, ctx));
for (int outputIndex : tagToIndexMap.values()) {
@SuppressWarnings("unchecked")
@@ -204,7 +204,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
}
@@ -345,7 +345,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
}
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
- mergedStreams.flatMapAsync(OpAdapter.adapt(op));
+ mergedStreams.flatMapAsync(OpAdapter.adapt(op, ctx));
for (int outputIndex : tagToIndexMap.values()) {
@SuppressWarnings("unchecked")
@@ -355,7 +355,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
ctx.registerMessageStream(indexToIdMap.get(outputIndex), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
index 60b6bda0d55..0ed7252cf73 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SplittableParDoTranslators.java
@@ -119,7 +119,7 @@ public class SplittableParDoTranslators {
final MessageStream<OpMessage<RawUnionValue>> taggedOutputStream =
partitionedInputStream
- .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>()))
+ .flatMapAsync(OpAdapter.adapt(new KvToKeyedWorkItemOp<>(), ctx))
.flatMapAsync(
OpAdapter.adapt(
new SplittableParDoProcessKeyedElementsOp<>(
@@ -129,7 +129,8 @@ public class SplittableParDoTranslators {
new DoFnOp.MultiOutputManagerFactory(tagToIndexMap),
ctx.getTransformFullName(),
ctx.getTransformId(),
- input.isBounded())));
+ input.isBounded()),
+ ctx));
for (int outputIndex : tagToIndexMap.values()) {
@SuppressWarnings("unchecked")
@@ -139,7 +140,7 @@ public class SplittableParDoTranslators {
message ->
message.getType() != OpMessage.Type.ELEMENT
|| message.getElement().getValue().getUnionTag() == outputIndex)
- .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue()));
+ .flatMapAsync(OpAdapter.adapt(new RawUnionValueToValue(), ctx));
ctx.registerMessageStream(indexToPCollectionMap.get(outputIndex), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
index f85e842d35a..593b5ee3528 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/WindowAssignTranslator.java
@@ -47,7 +47,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
final MessageStream<OpMessage<T>> inputStream = ctx.getMessageStream(ctx.getInput(transform));
final MessageStream<OpMessage<T>> outputStream =
- inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+ inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn), ctx));
ctx.registerMessageStream(output, outputStream);
}
@@ -73,7 +73,7 @@ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
final MessageStream<OpMessage<T>> inputStream = ctx.getOneInputMessageStream(transform);
final MessageStream<OpMessage<T>> outputStream =
- inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn)));
+ inputStream.flatMapAsync(OpAdapter.adapt(new WindowAssignOp<>(windowFn), ctx));
ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
new file mode 100644
index 00000000000..565aa639d45
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/util/SamzaPipelineExceptionListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.util;
+
+import org.apache.beam.runners.samza.SamzaPipelineExceptionContext;
+
+/**
+ * An ExceptionListener following Observer pattern. Any runtime exception caught by {@code
+ * OpAdapter} will be notified to any concrete SamzaPipelineExceptionListener at Runtime
+ */
+public interface SamzaPipelineExceptionListener {
+
+ void onException(SamzaPipelineExceptionContext exceptionContext);
+
+ interface Registrar {
+ SamzaPipelineExceptionListener getExceptionListener();
+ }
+}