You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/19 21:11:16 UTC
[kafka] branch 2.0 updated: KAFKA-7066 added better logging in case
of Serialisation issue (#5239)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new bc9bd06 KAFKA-7066 added better logging in case of Serialisation issue (#5239)
bc9bd06 is described below
commit bc9bd06a14c89fcded45e2e5ff2c5b69e4721a87
Author: Stephane Maarek <si...@users.noreply.github.com>
AuthorDate: Wed Jun 20 05:10:13 2018 +0800
KAFKA-7066 added better logging in case of Serialisation issue (#5239)
Following the error message of:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../apache/kafka/streams/state/StateSerdes.java | 27 ++++++++++++++++++++--
.../kafka/streams/state/StateSerdesTest.java | 17 ++++++++++++++
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index f1de82f..ec7803a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
import java.util.Objects;
@@ -165,7 +166,18 @@ public final class StateSerdes<K, V> {
* @return the serialized key
*/
public byte[] rawKey(K key) {
- return keySerde.serializer().serialize(topic, key);
+ try {
+ return keySerde.serializer().serialize(topic, key);
+ } catch (final ClassCastException e) {
+ final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
+ throw new StreamsException(
+ String.format("A serializer (key: %s) is not compatible to the actual key type " +
+ "(key type: %s). Change the default Serdes in StreamConfig or " +
+ "provide correct Serdes via method parameters.",
+ keySerializer().getClass().getName(),
+ keyClass),
+ e);
+ }
}
/**
@@ -175,6 +187,17 @@ public final class StateSerdes<K, V> {
* @return the serialized value
*/
public byte[] rawValue(V value) {
- return valueSerde.serializer().serialize(topic, value);
+ try {
+ return valueSerde.serializer().serialize(topic, value);
+ } catch (final ClassCastException e) {
+ final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+ throw new StreamsException(
+ String.format("A serializer (value: %s) is not compatible to the actual value type " +
+ "(value type: %s). Change the default Serdes in StreamConfig or " +
+ "provide correct Serdes via method parameters.",
+ valueSerializer().getClass().getName(),
+ valueClass),
+ e);
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
index 6f29888..714ce18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.errors.StreamsException;
import org.junit.Assert;
import org.junit.Test;
@@ -86,4 +87,20 @@ public class StateSerdesTest {
new StateSerdes<>("anyName", Serdes.ByteArray(), null);
}
+ @Test(expected = StreamsException.class)
+ public void shouldThrowIfIncompatibleSerdeForValue() throws ClassNotFoundException {
+ Class myClass = Class.forName("java.lang.String");
+ StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+ Integer myInt = 123;
+ stateSerdes.rawValue(myInt);
+ }
+
+ @Test(expected = StreamsException.class)
+ public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException {
+ Class myClass = Class.forName("java.lang.String");
+ StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
+ Integer myInt = 123;
+ stateSerdes.rawKey(myInt);
+ }
+
}