You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/28 18:33:26 UTC

[kafka] branch trunk updated: MINOR: improve error message for Serde type miss match (#6801)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88619b7  MINOR: improve error message for Serde type miss match (#6801)
88619b7 is described below

commit 88619b7dd85acd79c111cfdc90a594cb5c2cf96e
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue May 28 11:33:12 2019 -0700

    MINOR: improve error message for Serde type miss match (#6801)
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
 .../apache/kafka/streams/state/StateSerdes.java    | 17 ++++++---
 .../state/internals/ValueAndTimestampSerde.java    |  2 +-
 .../internals/ValueAndTimestampSerializer.java     |  2 +-
 .../kafka/streams/state/StateSerdesTest.java       | 40 +++++++++++++++++++---
 4 files changed, 51 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 55e9fde..1182e50 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer;
 
 import java.util.Objects;
 
@@ -171,7 +172,7 @@ public final class StateSerdes<K, V> {
         } catch (final ClassCastException e) {
             final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
             throw new StreamsException(
-                    String.format("A serializer (key: %s) is not compatible to the actual key type " +
+                    String.format("A serializer (%s) is not compatible to the actual key type " +
                                     "(key type: %s). Change the default Serdes in StreamConfig or " +
                                     "provide correct Serdes via method parameters.",
                             keySerializer().getClass().getName(),
@@ -190,12 +191,20 @@ public final class StateSerdes<K, V> {
         try {
             return valueSerde.serializer().serialize(topic, value);
         } catch (final ClassCastException e) {
-            final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+            final String valueClass;
+            final Class<? extends Serializer> serializerClass;
+            if (valueSerializer() instanceof ValueAndTimestampSerializer) {
+                serializerClass = ((ValueAndTimestampSerializer) valueSerializer()).valueSerializer.getClass();
+                valueClass = value == null ? "unknown because value is null" : ((ValueAndTimestamp) value).value().getClass().getName();
+            } else {
+                serializerClass = valueSerializer().getClass();
+                valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+            }
             throw new StreamsException(
-                    String.format("A serializer (value: %s) is not compatible to the actual value type " +
+                    String.format("A serializer (%s) is not compatible to the actual value type " +
                                     "(value type: %s). Change the default Serdes in StreamConfig or " +
                                     "provide correct Serdes via method parameters.",
-                            valueSerializer().getClass().getName(),
+                            serializerClass.getName(),
                             valueClass),
                     e);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index 8be11f3..c02992f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -28,7 +28,7 @@ public class ValueAndTimestampSerde<V> implements Serde<ValueAndTimestamp<V>> {
     private final ValueAndTimestampSerializer<V> valueAndTimestampSerializer;
     private final ValueAndTimestampDeserializer<V> valueAndTimestampDeserializer;
 
-    ValueAndTimestampSerde(final Serde<V> valueSerde) {
+    public ValueAndTimestampSerde(final Serde<V> valueSerde) {
         Objects.requireNonNull(valueSerde);
         valueAndTimestampSerializer = new ValueAndTimestampSerializer<>(valueSerde.serializer());
         valueAndTimestampDeserializer = new ValueAndTimestampDeserializer<>(valueSerde.deserializer());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 17903952..6db8ccd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
-class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimestamp<V>> {
+public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimestamp<V>> {
     public final Serializer<V> valueSerializer;
     private final Serializer<Long> timestampSerializer;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
index 56ff71d..8cb1c7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -19,11 +19,16 @@ package org.apache.kafka.streams.state;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThrows;
+
 @SuppressWarnings("unchecked")
 public class StateSerdesTest {
 
@@ -88,20 +93,47 @@ public class StateSerdesTest {
         new StateSerdes<>("anyName", Serdes.ByteArray(), null);
     }
 
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowIfIncompatibleSerdeForValue() throws ClassNotFoundException {
         final Class myClass = Class.forName("java.lang.String");
         final StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
         final Integer myInt = 123;
-        stateSerdes.rawValue(myInt);
+        final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawValue(myInt));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+                "is not compatible to the actual value type (value type: java.lang.Integer). " +
+                "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
+    }
+
+    @Test
+    public void shouldSkipValueAndTimestampeInformationForErrorOnTimestampAndValueSerialization() throws ClassNotFoundException {
+        final Class myClass = Class.forName("java.lang.String");
+        final StateSerdes<Object, Object> stateSerdes =
+            new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), new ValueAndTimestampSerde(Serdes.serdeFrom(myClass)));
+        final Integer myInt = 123;
+        final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawValue(ValueAndTimestamp.make(myInt, 0L)));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+                    "is not compatible to the actual value type (value type: java.lang.Integer). " +
+                    "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
     }
 
-    @Test(expected = StreamsException.class)
+    @Test
     public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException {
         final Class myClass = Class.forName("java.lang.String");
         final StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
         final Integer myInt = 123;
-        stateSerdes.rawKey(myInt);
+        final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawKey(myInt));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+                    "is not compatible to the actual key type (key type: java.lang.Integer). " +
+                    "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
     }
 
 }