You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/08/28 03:08:10 UTC

[kafka] branch trunk updated: KAFKA-12963: Add processor name to error (#11262)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84b111f  KAFKA-12963: Add processor name to error (#11262)
84b111f is described below

commit 84b111f968a47c65d3498a1b3af42dd644403728
Author: Andy Lapidas <al...@gmail.com>
AuthorDate: Fri Aug 27 22:06:49 2021 -0500

    KAFKA-12963: Add processor name to error (#11262)
    
    This PR adds the processor name to the ClassCastException exception text in process()
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../org/apache/kafka/streams/processor/internals/ProcessorNode.java    | 3 ++-
 .../apache/kafka/streams/processor/internals/ProcessorNodeTest.java    | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index dfc0b70..48c95f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -147,7 +147,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         } catch (final ClassCastException e) {
             final String keyClass = record.key() == null ? "unknown because key is null" : record.key().getClass().getName();
             final String valueClass = record.value() == null ? "unknown because value is null" : record.value().getClass().getName();
-            throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's "
+            throw new StreamsException(String.format("ClassCastException invoking processor: %s. Do the Processor's "
                     + "input types match the deserialized types? Check the Serde setup and change the default Serdes in "
                     + "StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept "
                     + "the deserialized input of type key: %s, and value: %s.%n"
@@ -155,6 +155,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                     + "another cause (in user code, for example). For example, if a processor wires in a store, but casts "
                     + "the generics incorrectly, a class cast exception could be raised during processing, but the "
                     + "cause would not be wrong Serdes.",
+                    this.name(),
                     keyClass,
                     valueClass),
                 e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 73147ed..87a4c68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -193,7 +193,7 @@ public class ProcessorNodeTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new InternalMockProcessorContext<>(streamsMetrics);
-        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node = new ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
@@ -202,5 +202,6 @@ public class ProcessorNodeTest {
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
         assertThat(se.getMessage(), containsString("input types"));
+        assertThat(se.getMessage(), containsString("pname"));
     }
 }