You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/17 22:18:09 UTC

kafka git commit: MINOR: Guard against NPE when throwing StreamsException on serializer mismatch

Repository: kafka
Updated Branches:
  refs/heads/trunk 9e787716b -> 783900c25


MINOR: Guard against NPE when throwing StreamsException on serializer mismatch

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2696 from miguno/trunk-sinknode-NPE


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/783900c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/783900c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/783900c2

Branch: refs/heads/trunk
Commit: 783900c259511f86f5af03cbd96f2e74833447b9
Parents: 9e78771
Author: Michael G. Noll <mi...@confluent.io>
Authored: Fri Mar 17 15:18:06 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Mar 17 15:18:06 2017 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/SinkNode.java   |  6 +-
 .../processor/internals/SinkNodeTest.java       | 94 +++++++++++++++-----
 2 files changed, 78 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/783900c2/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 94cb1f3..3d4f282 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,14 +77,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         try {
             collector.send(topic, key, value, null, timestamp, keySerializer, valSerializer, partitioner);
         } catch (ClassCastException e) {
+            final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
+            final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
             throw new StreamsException(
                     String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " +
                                     "(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " +
                                     "provide correct Serdes via method parameters.",
                                     keySerializer.getClass().getName(),
                                     valSerializer.getClass().getName(),
-                                    key.getClass().getName(),
-                                    value.getClass().getName()),
+                                    keyClass,
+                                    valueClass),
                     e);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/783900c2/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 718120a..dc9129a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -20,52 +20,106 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
-import java.util.Properties;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
 
 public class SinkNodeTest {
 
-    @Test(expected = StreamsException.class)
+    @Test
     @SuppressWarnings("unchecked")
-    public void invalidInputRecordTimestampTest() {
+    public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
+        // Given
         final Serializer anySerializer = Serdes.Bytes().serializer();
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+            new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
+        sink.init(context);
+        final Bytes anyKey = new Bytes("any key".getBytes());
+        final Bytes anyValue = new Bytes("any value".getBytes());
 
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null));
-        context.setTime(-1);
+        // When/Then
+        context.setTime(-1); // ensures a negative timestamp is set for the record we send next
+        try {
+            sink.process(anyKey, anyValue);
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException ignored) {
+        }
+    }
 
-        final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+        // Given
+        final Serializer anySerializer = Serdes.Bytes().serializer();
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+            new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        context.setTime(0);
+        final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
         sink.init(context);
+        final String keyOfDifferentTypeThanSerializer = "key with different type";
+        final String valueOfDifferentTypeThanSerializer = "value with different type";
 
-        sink.process(null, null);
+        // When/Then
+        try {
+            sink.process(keyOfDifferentTypeThanSerializer, valueOfDifferentTypeThanSerializer);
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException e) {
+            assertThat(e.getCause(), instanceOf(ClassCastException.class));
+        }
     }
 
-    @Test(expected = StreamsException.class)
+    @Test
     @SuppressWarnings("unchecked")
-    public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() {
+    public void shouldHandleNullKeysWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+        // Given
         final Serializer anySerializer = Serdes.Bytes().serializer();
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+            new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        context.setTime(1);
+        final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
+        sink.init(context);
+        final String invalidValueToTriggerSerializerMismatch = "";
 
-        Properties config = new Properties();
-        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
-        context.setTime(0);
+        // When/Then
+        try {
+            sink.process(null, invalidValueToTriggerSerializerMismatch);
+            fail("Should have thrown StreamsException");
+        } catch (final StreamsException e) {
+            assertThat(e.getCause(), instanceOf(ClassCastException.class));
+            assertThat(e.getMessage(), containsString("unknown because key is null"));
+        }
+    }
 
-        final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldHandleNullValuesWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+        // Given
+        final Serializer anySerializer = Serdes.Bytes().serializer();
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+            new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        context.setTime(1);
+        final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
         sink.init(context);
+        final String invalidKeyToTriggerSerializerMismatch = "";
 
+        // When/Then
         try {
-            sink.process("", "");
+            sink.process(invalidKeyToTriggerSerializerMismatch, null);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            if (e.getCause() instanceof ClassCastException) {
-                throw e;
-            }
-            throw new RuntimeException(e);
+            assertThat(e.getCause(), instanceOf(ClassCastException.class));
+            assertThat(e.getMessage(), containsString("unknown because value is null"));
         }
     }