You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/08/28 03:08:10 UTC
[kafka] branch trunk updated: KAFKA-12963: Add processor name to
error (#11262)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84b111f KAFKA-12963: Add processor name to error (#11262)
84b111f is described below
commit 84b111f968a47c65d3498a1b3af42dd644403728
Author: Andy Lapidas <al...@gmail.com>
AuthorDate: Fri Aug 27 22:06:49 2021 -0500
KAFKA-12963: Add processor name to error (#11262)
This PR adds the processor name to the ClassCastException exception text in process()
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../org/apache/kafka/streams/processor/internals/ProcessorNode.java | 3 ++-
.../apache/kafka/streams/processor/internals/ProcessorNodeTest.java | 3 ++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index dfc0b70..48c95f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -147,7 +147,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
} catch (final ClassCastException e) {
final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
- throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's "
+ throw new StreamsException(String.format("ClassCastException invoking processor: %s. Do the Processor's "
+ "input types match the deserialized types? Check the Serde setup and change the default Serdes in "
+ "StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept "
+ "the deserialized input of type key: %s, and value: %s.%n"
@@ -155,6 +155,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
+ "another cause (in user code, for example). For example, if a processor wires in a store, but casts "
+ "the generics incorrectly, a class cast exception could be raised during processing, but the "
+ "cause would not be wrong Serdes.",
+ this.name(),
keyClass,
valueClass),
e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 73147ed..87a4c68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -193,7 +193,7 @@ public class ProcessorNodeTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
- final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
final StreamsException se = assertThrows(
StreamsException.class,
@@ -202,5 +202,6 @@ public class ProcessorNodeTest {
assertThat(se.getCause(), instanceOf(ClassCastException.class));
assertThat(se.getMessage(), containsString("default Serdes"));
assertThat(se.getMessage(), containsString("input types"));
+ assertThat(se.getMessage(), containsString("pname"));
}
}