You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/17 22:18:09 UTC
kafka git commit: MINOR: Guard against NPE when throwing
StreamsException on serializer mismatch
Repository: kafka
Updated Branches:
refs/heads/trunk 9e787716b -> 783900c25
MINOR: Guard against NPE when throwing StreamsException on serializer mismatch
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Damian Guy, Guozhang Wang
Closes #2696 from miguno/trunk-sinknode-NPE
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/783900c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/783900c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/783900c2
Branch: refs/heads/trunk
Commit: 783900c259511f86f5af03cbd96f2e74833447b9
Parents: 9e78771
Author: Michael G. Noll <mi...@confluent.io>
Authored: Fri Mar 17 15:18:06 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Mar 17 15:18:06 2017 -0700
----------------------------------------------------------------------
.../streams/processor/internals/SinkNode.java | 6 +-
.../processor/internals/SinkNodeTest.java | 94 +++++++++++++++-----
2 files changed, 78 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/783900c2/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 94cb1f3..3d4f282 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -77,14 +77,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
try {
collector.send(topic, key, value, null, timestamp, keySerializer, valSerializer, partitioner);
} catch (ClassCastException e) {
+ final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
+ final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
throw new StreamsException(
String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type " +
"(key type: %s / value type: %s). Change the default Serdes in StreamConfig or " +
"provide correct Serdes via method parameters.",
keySerializer.getClass().getName(),
valSerializer.getClass().getName(),
- key.getClass().getName(),
- value.getClass().getName()),
+ keyClass,
+ valueClass),
e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/783900c2/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 718120a..dc9129a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -20,52 +20,106 @@ import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
-import java.util.Properties;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
public class SinkNodeTest {
- @Test(expected = StreamsException.class)
+ @Test
@SuppressWarnings("unchecked")
- public void invalidInputRecordTimestampTest() {
+ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {
+ // Given
final Serializer anySerializer = Serdes.Bytes().serializer();
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+ new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+ final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
+ sink.init(context);
+ final Bytes anyKey = new Bytes("any key".getBytes());
+ final Bytes anyValue = new Bytes("any value".getBytes());
- final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null));
- context.setTime(-1);
+ // When/Then
+ context.setTime(-1); // ensures a negative timestamp is set for the record we send next
+ try {
+ sink.process(anyKey, anyValue);
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException ignored) {
+ }
+ }
- final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldThrowStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+ // Given
+ final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+ new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+ context.setTime(0);
+ final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
sink.init(context);
+ final String keyOfDifferentTypeThanSerializer = "key with different type";
+ final String valueOfDifferentTypeThanSerializer = "value with different type";
- sink.process(null, null);
+ // When/Then
+ try {
+ sink.process(keyOfDifferentTypeThanSerializer, valueOfDifferentTypeThanSerializer);
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException e) {
+ assertThat(e.getCause(), instanceOf(ClassCastException.class));
+ }
}
- @Test(expected = StreamsException.class)
+ @Test
@SuppressWarnings("unchecked")
- public void shouldThrowStreamsExceptionOnKeyValyeTypeSerializerMissmatch() {
+ public void shouldHandleNullKeysWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+ // Given
final Serializer anySerializer = Serdes.Bytes().serializer();
final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+ new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+ context.setTime(1);
+ final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
+ sink.init(context);
+ final String invalidValueToTriggerSerializerMismatch = "";
- Properties config = new Properties();
- config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
- context.setTime(0);
+ // When/Then
+ try {
+ sink.process(null, invalidValueToTriggerSerializerMismatch);
+ fail("Should have thrown StreamsException");
+ } catch (final StreamsException e) {
+ assertThat(e.getCause(), instanceOf(ClassCastException.class));
+ assertThat(e.getMessage(), containsString("unknown because key is null"));
+ }
+ }
- final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldHandleNullValuesWhenThrowingStreamsExceptionOnKeyValueTypeSerializerMismatch() {
+ // Given
+ final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde,
+ new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+ context.setTime(1);
+ final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
sink.init(context);
+ final String invalidKeyToTriggerSerializerMismatch = "";
+ // When/Then
try {
- sink.process("", "");
+ sink.process(invalidKeyToTriggerSerializerMismatch, null);
+ fail("Should have thrown StreamsException");
} catch (final StreamsException e) {
- if (e.getCause() instanceof ClassCastException) {
- throw e;
- }
- throw new RuntimeException(e);
+ assertThat(e.getCause(), instanceOf(ClassCastException.class));
+ assertThat(e.getMessage(), containsString("unknown because value is null"));
}
}