You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/11 01:06:39 UTC
kafka git commit: MINOR: improve exception message for incompatible
Serdes to actual key/value data types
Repository: kafka
Updated Branches:
refs/heads/trunk cc62b4f84 -> d8fa4006c
MINOR: improve exception message for incompatible Serdes to actual key/value data types
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll, Guozhang Wang
Closes #2118 from mjsax/hotfixImproveSerdeTypeMissmatchError
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8fa4006
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8fa4006
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8fa4006
Branch: refs/heads/trunk
Commit: d8fa4006cb824aa2a8ab890b6754c920ac17182c
Parents: cc62b4f
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Nov 10 17:06:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 10 17:06:36 2016 -0800
----------------------------------------------------------------------
.../streams/processor/internals/SinkNode.java | 14 +-
.../processor/internals/SinkNodeTest.java | 132 +++++--------------
2 files changed, 43 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fa4006/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 8ac373b..2f20cdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -79,7 +79,19 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
"Use a different TimestampExtractor to process this data.");
}
- collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
+ try {
+ collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
+ } catch (ClassCastException e) {
+ throw new StreamsException(
+ String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " +
+ "(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " +
+ "provide correct Serdes via method parameters.",
+ keySerializer.getClass().getName(),
+ valSerializer.getClass().getName(),
+ key.getClass().getName(),
+ value.getClass().getName()),
+ e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fa4006/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 3b41517..51dc7d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,19 +17,17 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
-import java.io.File;
-import java.util.Map;
+import java.util.Properties;
public class SinkNodeTest {
@@ -37,108 +35,38 @@ public class SinkNodeTest {
@SuppressWarnings("unchecked")
public void invalidInputRecordTimestampTest() {
final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(null, null));
+ context.setTime(-1);
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
- sink.init(new MockProcessorContext());
+ sink.init(context);
sink.process(null, null);
}
- private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
- private final long invalidTimestamp = -1;
-
- @Override
- public String applicationId() {
- return null;
- }
-
- @Override
- public TaskId taskId() {
- return null;
- }
-
- @Override
- public Serde<?> keySerde() {
- return null;
- }
-
- @Override
- public Serde<?> valueSerde() {
- return null;
- }
-
- @Override
- public File stateDir() {
- return null;
- }
-
- @Override
- public StreamsMetrics metrics() {
- return null;
- }
-
- @Override
- public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
- }
-
- @Override
- public StateStore getStateStore(String name) {
- return null;
- }
-
- @Override
- public void schedule(long interval) {
- }
-
- @Override
- public <K, V> void forward(K key, V value) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, int childIndex) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, String childName) {
- }
-
- @Override
- public void commit() {
- }
-
- @Override
- public String topic() {
- return null;
- }
-
- @Override
- public int partition() {
- return 0;
- }
-
- @Override
- public long offset() {
- return 0;
- }
-
- @Override
- public long timestamp() {
- return invalidTimestamp;
- }
-
- @Override
- public Map<String, Object> appConfigs() {
- return null;
- }
+ @Test(expected = StreamsException.class)
+ @SuppressWarnings("unchecked")
+ public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() {
+ final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
- @Override
- public Map<String, Object> appConfigsWithPrefix(String prefix) {
- return null;
- }
+ Properties config = new Properties();
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+ context.setTime(0);
- @Override
- public RecordCollector recordCollector() {
- return null;
+ final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+ sink.init(context);
+
+ try {
+ sink.process("", "");
+ } catch (final StreamsException e) {
+ if (e.getCause() instanceof ClassCastException) {
+ throw e;
+ }
+ throw new RuntimeException(e);
}
}