You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/19 21:11:16 UTC

[kafka] branch 2.0 updated: KAFKA-7066 added better logging in case of Serialisation issue (#5239)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new bc9bd06  KAFKA-7066 added better logging in case of Serialisation issue (#5239)
bc9bd06 is described below

commit bc9bd06a14c89fcded45e2e5ff2c5b69e4721a87
Author: Stephane Maarek <si...@users.noreply.github.com>
AuthorDate: Wed Jun 20 05:10:13 2018 +0800

    KAFKA-7066 added better logging in case of Serialisation issue (#5239)
    
    Following the error message of:
    
    https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/state/StateSerdes.java    | 27 ++++++++++++++++++++--
 .../kafka/streams/state/StateSerdesTest.java       | 17 ++++++++++++++
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f1de82f..ec7803a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.util.Objects;
 
@@ -165,7 +166,18 @@ public final class StateSerdes<K, V> {
      * @return     the serialized key
      */
     public byte[] rawKey(K key) {
-        return keySerde.serializer().serialize(topic, key);
+        try {
+            return keySerde.serializer().serialize(topic, key);
+        } catch (final ClassCastException e) {
+            final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
+            throw new StreamsException(
+                    String.format("A serializer (key: %s) is not compatible to the actual key type " +
+                                    "(key type: %s). Change the default Serdes in StreamConfig or " +
+                                    "provide correct Serdes via method parameters.",
+                            keySerializer().getClass().getName(),
+                            keyClass),
+                    e);
+        }
     }
 
     /**
@@ -175,6 +187,17 @@ public final class StateSerdes<K, V> {
      * @return       the serialized value
      */
     public byte[] rawValue(V value) {
-        return valueSerde.serializer().serialize(topic, value);
+        try {
+            return valueSerde.serializer().serialize(topic, value);
+        } catch (final ClassCastException e) {
+            final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+            throw new StreamsException(
+                    String.format("A serializer (value: %s) is not compatible to the actual value type " +
+                                    "(value type: %s). Change the default Serdes in StreamConfig or " +
+                                    "provide correct Serdes via method parameters.",
+                            valueSerializer().getClass().getName(),
+                            valueClass),
+                    e);
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
index 6f29888..714ce18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -86,4 +87,20 @@ public class StateSerdesTest {
         new StateSerdes<>("anyName", Serdes.ByteArray(), null);
     }
 
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfIncompatibleSerdeForValue() throws ClassNotFoundException {
+        Class myClass = Class.forName("java.lang.String");
+        StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+        Integer myInt = 123;
+        stateSerdes.rawValue(myInt);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException {
+        Class myClass = Class.forName("java.lang.String");
+        StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+        Integer myInt = 123;
+        stateSerdes.rawKey(myInt);
+    }
+
 }