You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/11 01:06:39 UTC

kafka git commit: MINOR: improve exception message for incompatible Serdes to actual key/value data types

Repository: kafka
Updated Branches:
  refs/heads/trunk cc62b4f84 -> d8fa4006c


MINOR: improve exception message for incompatible Serdes to actual key/value data types

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll, Guozhang Wang

Closes #2118 from mjsax/hotfixImproveSerdeTypeMissmatchError


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8fa4006
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8fa4006
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8fa4006

Branch: refs/heads/trunk
Commit: d8fa4006cb824aa2a8ab890b6754c920ac17182c
Parents: cc62b4f
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Nov 10 17:06:36 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 10 17:06:36 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/SinkNode.java   |  14 +-
 .../processor/internals/SinkNodeTest.java       | 132 +++++--------------
 2 files changed, 43 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fa4006/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 8ac373b..2f20cdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -79,7 +79,19 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
                 "Use a different TimestampExtractor to process this data.");
         }
 
-        collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
+        try {
+            collector.send(new ProducerRecord<K, V>(topic, null, timestamp, key, value), keySerializer, valSerializer, partitioner);
+        } catch (ClassCastException e) {
+            throw new StreamsException(
+                    String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " +
+                                    "(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " +
+                                    "provide correct Serdes via method parameters.",
+                                    keySerializer.getClass().getName(),
+                                    valSerializer.getClass().getName(),
+                                    key.getClass().getName(),
+                                    value.getClass().getName()),
+                    e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d8fa4006/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 3b41517..51dc7d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,19 +17,17 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.Map;
+import java.util.Properties;
 
 public class SinkNodeTest {
 
@@ -37,108 +35,38 @@ public class SinkNodeTest {
     @SuppressWarnings("unchecked")
     public void invalidInputRecordTimestampTest() {
         final Serializer anySerializer = Serdes.Bytes().serializer();
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollector(null, null));
+        context.setTime(-1);
 
         final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
-        sink.init(new MockProcessorContext());
+        sink.init(context);
 
         sink.process(null, null);
     }
 
-    private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
-        private final long invalidTimestamp = -1;
-
-        @Override
-        public String applicationId() {
-            return null;
-        }
-
-        @Override
-        public TaskId taskId() {
-            return null;
-        }
-
-        @Override
-        public Serde<?> keySerde() {
-            return null;
-        }
-
-        @Override
-        public Serde<?> valueSerde() {
-            return null;
-        }
-
-        @Override
-        public File stateDir() {
-            return null;
-        }
-
-        @Override
-        public StreamsMetrics metrics() {
-            return null;
-        }
-
-        @Override
-        public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
-        }
-
-        @Override
-        public StateStore getStateStore(String name) {
-            return null;
-        }
-
-        @Override
-        public void schedule(long interval) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value, int childIndex) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value, String childName) {
-        }
-
-        @Override
-        public void commit() {
-        }
-
-        @Override
-        public String topic() {
-            return null;
-        }
-
-        @Override
-        public int partition() {
-            return 0;
-        }
-
-        @Override
-        public long offset() {
-            return 0;
-        }
-
-        @Override
-        public long timestamp() {
-            return invalidTimestamp;
-        }
-
-        @Override
-        public Map<String, Object> appConfigs() {
-            return null;
-        }
+    @Test(expected = StreamsException.class)
+    @SuppressWarnings("unchecked")
+    public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() {
+        final Serializer anySerializer = Serdes.Bytes().serializer();
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
-        @Override
-        public Map<String, Object> appConfigsWithPrefix(String prefix) {
-            return null;
-        }
+        Properties config = new Properties();
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        context.setTime(0);
 
-        @Override
-        public RecordCollector recordCollector() {
-            return null;
+        final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+        sink.init(context);
+
+        try {
+            sink.process("", "");
+        } catch (final StreamsException e) {
+            if (e.getCause() instanceof ClassCastException) {
+                throw e;
+            }
+            throw new RuntimeException(e);
         }
     }