You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/28 18:33:26 UTC
[kafka] branch trunk updated: MINOR: improve error message for
Serde type miss match (#6801)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88619b7 MINOR: improve error message for Serde type miss match (#6801)
88619b7 is described below
commit 88619b7dd85acd79c111cfdc90a594cb5c2cf96e
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue May 28 11:33:12 2019 -0700
MINOR: improve error message for Serde type miss match (#6801)
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Boyang Chen <bo...@confluent.io>
---
.../apache/kafka/streams/state/StateSerdes.java | 17 ++++++---
.../state/internals/ValueAndTimestampSerde.java | 2 +-
.../internals/ValueAndTimestampSerializer.java | 2 +-
.../kafka/streams/state/StateSerdesTest.java | 40 +++++++++++++++++++---
4 files changed, 51 insertions(+), 10 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 55e9fde..1182e50 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer;
import java.util.Objects;
@@ -171,7 +172,7 @@ public final class StateSerdes<K, V> {
} catch (final ClassCastException e) {
final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
throw new StreamsException(
- String.format("A serializer (key: %s) is not compatible to the actual key type " +
+ String.format("A serializer (%s) is not compatible to the actual key type " +
"(key type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
keySerializer().getClass().getName(),
@@ -190,12 +191,20 @@ public final class StateSerdes<K, V> {
try {
return valueSerde.serializer().serialize(topic, value);
} catch (final ClassCastException e) {
- final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+ final String valueClass;
+ final Class<? extends Serializer> serializerClass;
+ if (valueSerializer() instanceof ValueAndTimestampSerializer) {
+ serializerClass = ((ValueAndTimestampSerializer) valueSerializer()).valueSerializer.getClass();
+ valueClass = value == null ? "unknown because value is null" : ((ValueAndTimestamp) value).value().getClass().getName();
+ } else {
+ serializerClass = valueSerializer().getClass();
+ valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
+ }
throw new StreamsException(
- String.format("A serializer (value: %s) is not compatible to the actual value type " +
+ String.format("A serializer (%s) is not compatible to the actual value type " +
"(value type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
- valueSerializer().getClass().getName(),
+ serializerClass.getName(),
valueClass),
e);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
index 8be11f3..c02992f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerde.java
@@ -28,7 +28,7 @@ public class ValueAndTimestampSerde<V> implements Serde<ValueAndTimestamp<V>> {
private final ValueAndTimestampSerializer<V> valueAndTimestampSerializer;
private final ValueAndTimestampDeserializer<V> valueAndTimestampDeserializer;
- ValueAndTimestampSerde(final Serde<V> valueSerde) {
+ public ValueAndTimestampSerde(final Serde<V> valueSerde) {
Objects.requireNonNull(valueSerde);
valueAndTimestampSerializer = new ValueAndTimestampSerializer<>(valueSerde.serializer());
valueAndTimestampDeserializer = new ValueAndTimestampDeserializer<>(valueSerde.deserializer());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 17903952..6db8ccd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
-class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimestamp<V>> {
+public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimestamp<V>> {
public final Serializer<V> valueSerializer;
private final Serializer<Long> timestampSerializer;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
index 56ff71d..8cb1c7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateSerdesTest.java
@@ -19,11 +19,16 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThrows;
+
@SuppressWarnings("unchecked")
public class StateSerdesTest {
@@ -88,20 +93,47 @@ public class StateSerdesTest {
new StateSerdes<>("anyName", Serdes.ByteArray(), null);
}
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowIfIncompatibleSerdeForValue() throws ClassNotFoundException {
final Class myClass = Class.forName("java.lang.String");
final StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
final Integer myInt = 123;
- stateSerdes.rawValue(myInt);
+ final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawValue(myInt));
+ assertThat(
+ e.getMessage(),
+ equalTo(
+ "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+ "is not compatible to the actual value type (value type: java.lang.Integer). " +
+ "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
+ }
+
+ @Test
+ public void shouldSkipValueAndTimestampeInformationForErrorOnTimestampAndValueSerialization() throws ClassNotFoundException {
+ final Class myClass = Class.forName("java.lang.String");
+ final StateSerdes<Object, Object> stateSerdes =
+ new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), new ValueAndTimestampSerde(Serdes.serdeFrom(myClass)));
+ final Integer myInt = 123;
+ final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawValue(ValueAndTimestamp.make(myInt, 0L)));
+ assertThat(
+ e.getMessage(),
+ equalTo(
+ "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+ "is not compatible to the actual value type (value type: java.lang.Integer). " +
+ "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
}
- @Test(expected = StreamsException.class)
+ @Test
public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException {
final Class myClass = Class.forName("java.lang.String");
final StateSerdes<Object, Object> stateSerdes = new StateSerdes<Object, Object>("anyName", Serdes.serdeFrom(myClass), Serdes.serdeFrom(myClass));
final Integer myInt = 123;
- stateSerdes.rawKey(myInt);
+ final Exception e = assertThrows(StreamsException.class, () -> stateSerdes.rawKey(myInt));
+ assertThat(
+ e.getMessage(),
+ equalTo(
+ "A serializer (org.apache.kafka.common.serialization.StringSerializer) " +
+ "is not compatible to the actual key type (key type: java.lang.Integer). " +
+ "Change the default Serdes in StreamConfig or provide correct Serdes via method parameters."));
}
}