You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/21 17:47:31 UTC

kafka git commit: MINOR: fix record collector to stick with streams partitioner behavior if it is specified

Repository: kafka
Updated Branches:
  refs/heads/trunk 93fbda4c5 -> 3bedcce01


MINOR: fix record collector to stick with streams partitioner behavior if it is specified

If `partition==null` and `partitioner!=null` we should not fall back to default partitioner (as we do before the patch if `producer.partitionsFor(...)` returns empty list. Falling back to default partitioner might corrupt hash partitioning.

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Eno Thereska, Damian Guy, Guozhang Wang

Closes #2868 from mjsax/minor-fix-RecordCollector


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bedcce0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bedcce0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bedcce0

Branch: refs/heads/trunk
Commit: 3bedcce01b9d3e8ba0516fa33eb59d57817ce27e
Parents: 93fbda4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Apr 21 10:47:28 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 21 10:47:28 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/RecordCollector.java    |  26 ++---
 .../internals/RecordCollectorImpl.java          |  72 +++++++------
 .../streams/processor/internals/SinkNode.java   |  34 ++++---
 .../processor/internals/StandbyContextImpl.java |  74 +++++++-------
 .../internals/RecordCollectorTest.java          |  68 ++++++++-----
 .../streams/state/KeyValueStoreTestDriver.java  | 101 +++++++++----------
 .../state/internals/StoreChangeLoggerTest.java  |  29 +++---
 .../apache/kafka/test/KStreamTestDriver.java    |  94 ++++++++---------
 .../apache/kafka/test/NoOpRecordCollector.java  |  29 +++---
 9 files changed, 277 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 1fa5b97..4516f8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -23,22 +23,22 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import java.util.Map;
 
 public interface RecordCollector {
+
     <K, V> void send(final String topic,
-                     K key,
-                     V value,
-                     Integer partition,
-                     Long timestamp,
-                     Serializer<K> keySerializer,
-                     Serializer<V> valueSerializer);
+                     final K key,
+                     final V value,
+                     final Integer partition,
+                     final Long timestamp,
+                     final Serializer<K> keySerializer,
+                     final Serializer<V> valueSerializer);
 
     <K, V> void send(final String topic,
-                     K key,
-                     V value,
-                     Integer partition,
-                     Long timestamp,
-                     Serializer<K> keySerializer,
-                     Serializer<V> valueSerializer,
-                     StreamPartitioner<? super K, ? super V> partitioner);
+                     final K key,
+                     final V value,
+                     final Long timestamp,
+                     final Serializer<K> keySerializer,
+                     final Serializer<V> valueSerializer,
+                     final StreamPartitioner<? super K, ? super V> partitioner);
 
     void flush();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0122ea0..9d2ac03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -46,54 +46,62 @@ public class RecordCollectorImpl implements RecordCollector {
     private volatile Exception sendException;
 
 
-    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+    public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) {
         this.producer = producer;
-        this.offsets = new HashMap<>();
-        this.logPrefix = String.format("task [%s]", streamTaskId);
+        offsets = new HashMap<>();
+        logPrefix = String.format("task [%s]", streamTaskId);
     }
 
     @Override
     public <K, V> void send(final String topic,
-                            K key,
-                            V value,
-                            Integer partition,
-                            Long timestamp,
-                            Serializer<K> keySerializer,
-                            Serializer<V> valueSerializer) {
-        send(topic, key, value, partition, timestamp, keySerializer, valueSerializer, null);
+                            final K key,
+                            final V value,
+                            final Long timestamp,
+                            final Serializer<K> keySerializer,
+                            final Serializer<V> valueSerializer,
+                            final StreamPartitioner<? super K, ? super V> partitioner) {
+        Integer partition = null;
+
+        if (partitioner != null) {
+            final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+            if (partitions.size() > 0) {
+                partition = partitioner.partition(key, value, partitions.size());
+            } else {
+                throw new StreamsException("Could not get partition information for topic '" + topic + "'." +
+                    " This can happen if the topic does not exist.");
+            }
+        }
+
+        send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
     }
 
     @Override
     public <K, V> void  send(final String topic,
-                             K key,
-                             V value,
-                             Integer partition,
-                             Long timestamp,
-                             Serializer<K> keySerializer,
-                             Serializer<V> valueSerializer,
-                             StreamPartitioner<? super K, ? super V> partitioner) {
+                             final K key,
+                             final V value,
+                             final Integer partition,
+                             final Long timestamp,
+                             final Serializer<K> keySerializer,
+                             final Serializer<V> valueSerializer) {
         checkForException();
-        byte[] keyBytes = keySerializer.serialize(topic, key);
-        byte[] valBytes = valueSerializer.serialize(topic, value);
-        if (partition == null && partitioner != null) {
-            List<PartitionInfo> partitions = this.producer.partitionsFor(topic);
-            if (partitions != null && partitions.size() > 0)
-                partition = partitioner.partition(key, value, partitions.size());
-        }
+        final byte[] keyBytes = keySerializer.serialize(topic, key);
+        final byte[] valBytes = valueSerializer.serialize(topic, value);
 
-        ProducerRecord<byte[], byte[]> serializedRecord =
+        final ProducerRecord<byte[], byte[]> serializedRecord =
                 new ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes);
 
-        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+        // counting from 1 to make check further down more natural
+        // -> `if (attempt == MAX_SEND_ATTEMPTS)`
+        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; ++attempt) {
             try {
-                this.producer.send(serializedRecord, new Callback() {
+                producer.send(serializedRecord, new Callback() {
                     @Override
-                    public void onCompletion(RecordMetadata metadata, Exception exception) {
+                    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
                         if (exception == null) {
                             if (sendException != null) {
                                 return;
                             }
-                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                            final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
                             offsets.put(tp, metadata.offset());
                         } else {
                             if (sendException == null) {
@@ -104,7 +112,7 @@ public class RecordCollectorImpl implements RecordCollector {
                     }
                 });
                 return;
-            } catch (TimeoutException e) {
+            } catch (final TimeoutException e) {
                 if (attempt == MAX_SEND_ATTEMPTS) {
                     throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
                 }
@@ -124,7 +132,7 @@ public class RecordCollectorImpl implements RecordCollector {
     @Override
     public void flush() {
         log.debug("{} Flushing producer", logPrefix);
-        this.producer.flush();
+        producer.flush();
         checkForException();
     }
 
@@ -144,7 +152,7 @@ public class RecordCollectorImpl implements RecordCollector {
      */
     @Override
     public Map<TopicPartition, Long> offsets() {
-        return this.offsets;
+        return offsets;
     }
 
     // for testing only

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 3d4f282..00e5dca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -31,7 +31,11 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
+    public SinkNode(final String name,
+                    final String topic,
+                    final Serializer<K> keySerializer,
+                    final Serializer<V> valSerializer,
+                    final StreamPartitioner<? super K, ? super V> partitioner) {
         super(name);
 
         this.topic = topic;
@@ -44,30 +48,35 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
      * @throws UnsupportedOperationException if this method adds a child to a sink node
      */
     @Override
-    public void addChild(ProcessorNode<?, ?> child) {
+    public void addChild(final ProcessorNode<?, ?> child) {
         throw new UnsupportedOperationException("sink node does not allow addChild");
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void init(ProcessorContext context) {
+    public void init(final ProcessorContext context) {
         super.init(context);
         this.context = context;
 
         // if serializers are null, get the default ones from the context
-        if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer();
-        if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer();
+        if (keySerializer == null) {
+            keySerializer = (Serializer<K>) context.keySerde().serializer();
+        }
+        if (valSerializer == null) {
+            valSerializer = (Serializer<V>) context.valueSerde().serializer();
+        }
 
         // if value serializers are for {@code Change} values, set the inner serializer when necessary
-        if (this.valSerializer instanceof ChangedSerializer &&
-                ((ChangedSerializer) this.valSerializer).inner() == null)
-            ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer());
+        if (valSerializer instanceof ChangedSerializer &&
+                ((ChangedSerializer) valSerializer).inner() == null) {
+            ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
+        }
     }
 
 
     @Override
     public void process(final K key, final V value) {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+        final RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
 
         final long timestamp = context.timestamp();
         if (timestamp < 0) {
@@ -75,8 +84,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         }
 
         try {
-            collector.send(topic, key, value, null, timestamp, keySerializer, valSerializer, partitioner);
-        } catch (ClassCastException e) {
+            collector.send(topic, key, value, timestamp, keySerializer, valSerializer, partitioner);
+        } catch (final ClassCastException e) {
             final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
             final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
             throw new StreamsException(
@@ -102,7 +111,8 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     /**
      * @return a string representation of this node starting with the given indent, useful for debugging.
      */
-    public String toString(String indent) {
+    @Override
+    public String toString(final String indent) {
         final StringBuilder sb = new StringBuilder(super.toString(indent));
         sb.append(indent).append("\ttopic:\t\t");
         sb.append(topic);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 00dbb3c..d738a19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -32,25 +32,22 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
         @Override
         public <K, V> void send(final String topic,
-                                K key,
-                                V value,
-                                Integer partition,
-                                Long timestamp,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valueSerializer) {
+                                final K key,
+                                final V value,
+                                final Integer partition,
+                                final Long timestamp,
+                                final Serializer<K> keySerializer,
+                                final Serializer<V> valueSerializer) {
         }
 
         @Override
         public <K, V> void send(final String topic,
-                                K key,
-                                V value,
-                                Integer partition,
-                                Long timestamp,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valueSerializer,
-                                StreamPartitioner<? super K, ? super V> partitioner) {
-
-        }
+                                final K key,
+                                final V value,
+                                final Long timestamp,
+                                final Serializer<K> keySerializer,
+                                final Serializer<V> valueSerializer,
+                                final StreamPartitioner<? super K, ? super V> partitioner) {}
 
         @Override
         public void flush() {
@@ -69,10 +66,10 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     };
 
     public StandbyContextImpl(final TaskId id,
-                       final String applicationId,
-                       final StreamsConfig config,
-                       final ProcessorStateManager stateMgr,
-                       final StreamsMetrics metrics) {
+                              final String applicationId,
+                              final StreamsConfig config,
+                              final ProcessorStateManager stateMgr,
+                              final StreamsMetrics metrics) {
         super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
     }
 
@@ -87,15 +84,15 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public StateStore getStateStore(String name) {
+    public StateStore getStateStore(final String name) {
         throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
     public String topic() {
@@ -103,7 +100,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
     public int partition() {
@@ -111,7 +108,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
     public long offset() {
@@ -119,7 +116,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
     public long timestamp() {
@@ -127,31 +124,31 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public <K, V> void forward(K key, V value) {
+    public <K, V> void forward(final K key, final V value) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public <K, V> void forward(K key, V value, int childIndex) {
+    public <K, V> void forward(final K key, final V value, final int childIndex) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public <K, V> void forward(K key, V value, String childName) {
+    public <K, V> void forward(final K key, final V value, final String childName) {
         throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
     public void commit() {
@@ -159,19 +156,25 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     }
 
     /**
-     * @throws UnsupportedOperationException
+     * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public void schedule(long interval) {
+    public void schedule(final long interval) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public RecordContext recordContext() {
         throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks.");
     }
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public void setRecordContext(final RecordContext recordContext) {
         throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks.");
@@ -183,6 +186,9 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         // no-op. can't throw as this is called on commit when the StateStores get flushed.
     }
 
+    /**
+     * @throws UnsupportedOperationException on every invocation
+     */
     @Override
     public ProcessorNode currentNode() {
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 5798514..1aa7f38 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -43,13 +43,13 @@ import static org.junit.Assert.assertEquals;
 
 public class RecordCollectorTest {
 
-    private List<PartitionInfo> infos = Arrays.asList(
+    private final List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
             new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
+    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos,
             Collections.<String>emptySet(), Collections.<String>emptySet());
 
 
@@ -58,7 +58,7 @@ public class RecordCollectorTest {
 
     private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() {
         @Override
-        public Integer partition(String key, Object value, int numPartitions) {
+        public Integer partition(final String key, final Object value, final int numPartitions) {
             return Integer.parseInt(key) % numPartitions;
         }
     };
@@ -66,7 +66,7 @@ public class RecordCollectorTest {
     @Test
     public void testSpecificPartition() {
 
-        RecordCollectorImpl collector = new RecordCollectorImpl(
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestSpecificPartition");
 
@@ -79,16 +79,16 @@ public class RecordCollectorTest {
 
         collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
 
-        Map<TopicPartition, Long> offsets = collector.offsets();
+        final Map<TopicPartition, Long> offsets = collector.offsets();
 
         assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 0)));
         assertEquals((Long) 1L, offsets.get(new TopicPartition("topic1", 1)));
         assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
 
         // ignore StreamPartitioner
-        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", 1, null, stringSerializer, stringSerializer);
+        collector.send("topic1", "999", "0", 2, null, stringSerializer, stringSerializer);
 
         assertEquals((Long) 3L, offsets.get(new TopicPartition("topic1", 0)));
         assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -98,23 +98,23 @@ public class RecordCollectorTest {
     @Test
     public void testStreamPartitioner() {
 
-        RecordCollectorImpl collector = new RecordCollectorImpl(
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestStreamPartitioner");
 
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "9", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "27", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "81", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "243", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "27", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "81", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "243", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "28", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "82", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "244", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "28", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "82", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "244", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
-        collector.send("topic1", "245", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "245", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
-        Map<TopicPartition, Long> offsets = collector.offsets();
+        final Map<TopicPartition, Long> offsets = collector.offsets();
 
         assertEquals((Long) 4L, offsets.get(new TopicPartition("topic1", 0)));
         assertEquals((Long) 2L, offsets.get(new TopicPartition("topic1", 1)));
@@ -125,7 +125,7 @@ public class RecordCollectorTest {
     @Test
     public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
         final AtomicInteger attempt = new AtomicInteger(0);
-        RecordCollectorImpl collector = new RecordCollectorImpl(
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -137,7 +137,7 @@ public class RecordCollectorTest {
                 },
                 "test");
 
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         final Long offset = collector.offsets().get(new TopicPartition("topic1", 0));
         assertEquals(Long.valueOf(0L), offset);
     }
@@ -145,7 +145,7 @@ public class RecordCollectorTest {
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
-        RecordCollector collector = new RecordCollectorImpl(
+        final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -154,7 +154,7 @@ public class RecordCollectorTest {
                 },
                 "test");
 
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
     }
 
@@ -170,8 +170,8 @@ public class RecordCollectorTest {
                     }
                 },
                 "test");
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
     @SuppressWarnings("unchecked")
@@ -186,7 +186,7 @@ public class RecordCollectorTest {
                     }
                 },
                 "test");
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.flush();
     }
 
@@ -202,8 +202,22 @@ public class RecordCollectorTest {
                     }
                 },
                 "test");
-        collector.send("topic1", "3", "0", null, null, stringSerializer, stringSerializer, streamPartitioner);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.close();
     }
 
+    @SuppressWarnings("unchecked")
+    @Test(expected = StreamsException.class)
+    public void shouldThrowIfTopicIsUnknown() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                @Override
+                public List<PartitionInfo> partitionsFor(final String topic) {
+                    return Collections.EMPTY_LIST;
+                }
+
+            },
+            "test");
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index b758799..a6804e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -45,7 +45,7 @@ import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -154,9 +154,9 @@ public class KeyValueStoreTestDriver<K, V> {
      *            {@code Long.class}, or {@code byte[].class}
      * @return the test driver; never null
      */
-    public static <K, V> KeyValueStoreTestDriver<K, V> create(Class<K> keyClass, Class<V> valueClass) {
-        StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass);
-        return new KeyValueStoreTestDriver<K, V>(serdes);
+    public static <K, V> KeyValueStoreTestDriver<K, V> create(final Class<K> keyClass, final Class<V> valueClass) {
+        final StateSerdes<K, V> serdes = StateSerdes.withBuiltinTypes("unexpected", keyClass, valueClass);
+        return new KeyValueStoreTestDriver<>(serdes);
     }
 
     /**
@@ -175,11 +175,11 @@ public class KeyValueStoreTestDriver<K, V> {
                                                               final Deserializer<K> keyDeserializer,
                                                               final Serializer<V> valueSerializer,
                                                               final Deserializer<V> valueDeserializer) {
-        StateSerdes<K, V> serdes = new StateSerdes<K, V>(
+        final StateSerdes<K, V> serdes = new StateSerdes<>(
             "unexpected",
             Serdes.serdeFrom(keySerializer, keyDeserializer),
             Serdes.serdeFrom(valueSerializer, valueDeserializer));
-        return new KeyValueStoreTestDriver<K, V>(serdes);
+        return new KeyValueStoreTestDriver<>(serdes);
     }
 
     private final Map<K, V> flushedEntries = new HashMap<>();
@@ -187,53 +187,50 @@ public class KeyValueStoreTestDriver<K, V> {
     private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
     private final MockProcessorContext context;
     private final Map<String, StateStore> storeMap = new HashMap<>();
-    private MockTime time = new MockTime();
-    private MetricConfig config = new MetricConfig();
-    private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time, true);
+    private final MockTime time = new MockTime();
+    private final MetricConfig config = new MetricConfig();
+    private final Metrics metrics = new Metrics(config, Collections.singletonList((MetricsReporter) new JmxReporter()), time, true);
 
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1 * 1024 * 1024L;
     private final ThreadCache cache = new ThreadCache("testCache", DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
     private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
-    private final RecordCollector recordCollector;
     private File stateDir = null;
 
-    protected KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
-        ByteArraySerializer rawSerializer = new ByteArraySerializer();
-        Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
+    private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
+        final ByteArraySerializer rawSerializer = new ByteArraySerializer();
+        final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
+        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
             @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(final String topic,
-                                      K1 key,
-                                      V1 value,
-                                      Integer partition,
-                                      Long timestamp,
-                                      Serializer<K1> keySerializer,
-                                      Serializer<V1> valueSerializer) {
+                                      final K1 key,
+                                      final V1 value,
+                                      final Integer partition,
+                                      final Long timestamp,
+                                      final Serializer<K1> keySerializer,
+                                      final Serializer<V1> valueSerializer) {
             // for byte arrays we need to wrap it for comparison
 
-                K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
-                V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
+                final K keyTest = serdes.keyFrom(keySerializer.serialize(topic, key));
+                final V valueTest = serdes.valueFrom(valueSerializer.serialize(topic, value));
 
                 recordFlushed(keyTest, valueTest);
             }
 
             @Override
             public <K1, V1> void send(final String topic,
-                                      K1 key,
-                                      V1 value,
-                                      Integer partition,
-                                      Long timestamp,
-                                      Serializer<K1> keySerializer,
-                                      Serializer<V1> valueSerializer,
-                                      StreamPartitioner<? super K1, ? super V1> partitioner) {
-                // ignore partitioner
-                send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+                                      final K1 key,
+                                      final V1 value,
+                                      final Long timestamp,
+                                      final Serializer<K1> keySerializer,
+                                      final Serializer<V1> valueSerializer,
+                                      final StreamPartitioner<? super K1, ? super V1> partitioner) {
+                throw new UnsupportedOperationException();
             }
         };
-        this.stateDir = TestUtils.tempDirectory();
-        this.stateDir.mkdirs();
+        stateDir = TestUtils.tempDirectory();
+        stateDir.mkdirs();
 
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@@ -244,25 +241,25 @@ public class KeyValueStoreTestDriver<K, V> {
 
 
 
-        this.context = new MockProcessorContext(this.stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+        context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             @Override
             public TaskId taskId() {
                 return new TaskId(0, 1);
             }
 
             @Override
-            public <K1, V1> void forward(K1 key, V1 value, int childIndex) {
+            public <K1, V1> void forward(final K1 key, final V1 value, final int childIndex) {
                 forward(key, value);
             }
 
             @Override
-            public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
+            public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback func) {
                 storeMap.put(store.name(), store);
                 restoreEntries(func, serdes);
             }
 
             @Override
-            public StateStore getStateStore(String name) {
+            public StateStore getStateStore(final String name) {
                 return storeMap.get(name);
             }
 
@@ -282,7 +279,7 @@ public class KeyValueStoreTestDriver<K, V> {
             }
 
             @Override
-            public Map<String, Object> appConfigsWithPrefix(String prefix) {
+            public Map<String, Object> appConfigsWithPrefix(final String prefix) {
                 return new StreamsConfig(props).originalsWithPrefix(prefix);
             }
 
@@ -298,7 +295,7 @@ public class KeyValueStoreTestDriver<K, V> {
         };
     }
 
-    protected void recordFlushed(K key, V value) {
+    private void recordFlushed(final K key, final V value) {
         if (value == null) {
             // This is a removal ...
             flushedRemovals.add(key);
@@ -310,11 +307,11 @@ public class KeyValueStoreTestDriver<K, V> {
         }
     }
 
-    private void restoreEntries(StateRestoreCallback func, StateSerdes<K, V> serdes) {
-        for (KeyValue<K, V> entry : restorableEntries) {
+    private void restoreEntries(final StateRestoreCallback func, final StateSerdes<K, V> serdes) {
+        for (final KeyValue<K, V> entry : restorableEntries) {
             if (entry != null) {
-                byte[] rawKey = serdes.rawKey(entry.key);
-                byte[] rawValue = serdes.rawValue(entry.value);
+                final byte[] rawKey = serdes.rawKey(entry.key);
+                final byte[] rawValue = serdes.rawValue(entry.value);
                 func.restore(rawKey, rawValue);
             }
         }
@@ -349,8 +346,8 @@ public class KeyValueStoreTestDriver<K, V> {
      * @param value the value for the entry
      * @see #checkForRestoredEntries(KeyValueStore)
      */
-    public void addEntryToRestoreLog(K key, V value) {
-        restorableEntries.add(new KeyValue<K, V>(key, value));
+    public void addEntryToRestoreLog(final K key, final V value) {
+        restorableEntries.add(new KeyValue<>(key, value));
     }
 
     /**
@@ -386,11 +383,11 @@ public class KeyValueStoreTestDriver<K, V> {
      * @return the number of restore entries missing from the store, or 0 if all restore entries were found
      * @see #addEntryToRestoreLog(Object, Object)
      */
-    public int checkForRestoredEntries(KeyValueStore<K, V> store) {
+    public int checkForRestoredEntries(final KeyValueStore<K, V> store) {
         int missing = 0;
-        for (KeyValue<K, V> kv : restorableEntries) {
+        for (final KeyValue<K, V> kv : restorableEntries) {
             if (kv != null) {
-                V value = store.get(kv.key);
+                final V value = store.get(kv.key);
                 if (!Objects.equals(value, kv.value)) {
                     ++missing;
                 }
@@ -405,7 +402,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @param store the key value store using this {@link #context()}.
      * @return the number of entries
      */
-    public int sizeOf(KeyValueStore<K, V> store) {
+    public int sizeOf(final KeyValueStore<K, V> store) {
         int size = 0;
         try (KeyValueIterator<K, V> iterator = store.all()) {
             while (iterator.hasNext()) {
@@ -421,9 +418,9 @@ public class KeyValueStoreTestDriver<K, V> {
      *
      * @param key the key
      * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this
-     *         key was {@link #flushedEntryStored(Object) removed} upon flush
+     *         key was removed upon flush
      */
-    public V flushedEntryStored(K key) {
+    public V flushedEntryStored(final K key) {
         return flushedEntries.get(key);
     }
 
@@ -434,7 +431,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not
      *         removed when last flushed
      */
-    public boolean flushedEntryRemoved(K key) {
+    public boolean flushedEntryRemoved(final K key) {
         return flushedRemovals.contains(key);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 311eaf6..515fef6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -35,33 +35,30 @@ public class StoreChangeLoggerTest {
     private final String topic = "topic";
 
     private final Map<Integer, String> logged = new HashMap<>();
-    private final Map<Integer, String> written = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
             new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(final String topic,
-                                          K1 key,
-                                          V1 value,
-                                          Integer partition,
-                                          Long timestamp,
-                                          Serializer<K1> keySerializer,
-                                          Serializer<V1> valueSerializer) {
+                                          final K1 key,
+                                          final V1 value,
+                                          final Integer partition,
+                                          final Long timestamp,
+                                          final Serializer<K1> keySerializer,
+                                          final Serializer<V1> valueSerializer) {
                     logged.put((Integer) key, (String) value);
                 }
 
                 @Override
                 public <K1, V1> void send(final String topic,
-                                           K1 key,
-                                           V1 value,
-                                           Integer partition,
-                                           Long timestamp,
-                                           Serializer<K1> keySerializer,
-                                           Serializer<V1> valueSerializer,
-                                           StreamPartitioner<? super K1, ? super V1> partitioner) {
-                    // ignore partitioner
-                    send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
+                                          final K1 key,
+                                          final V1 value,
+                                          final Long timestamp,
+                                          final Serializer<K1> keySerializer,
+                                          final Serializer<V1> valueSerializer,
+                                          final StreamPartitioner<? super K1, ? super V1> partitioner) {
+                    throw new UnsupportedOperationException();
                 }
             }
     );

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index f79da31..5113285 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -46,36 +46,36 @@ public class KStreamTestDriver {
     private final MockProcessorContext context;
     private final ProcessorTopology globalTopology;
 
-    public KStreamTestDriver(KStreamBuilder builder) {
+    public KStreamTestDriver(final KStreamBuilder builder) {
         this(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(KStreamBuilder builder, File stateDir) {
+    public KStreamTestDriver(final KStreamBuilder builder, final File stateDir) {
         this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    public KStreamTestDriver(KStreamBuilder builder, File stateDir, final long cacheSize) {
+    public KStreamTestDriver(final KStreamBuilder builder, final File stateDir, final long cacheSize) {
         this(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
     }
 
-    public KStreamTestDriver(KStreamBuilder builder,
-                             File stateDir,
-                             Serde<?> keySerde,
-                             Serde<?> valSerde) {
+    public KStreamTestDriver(final KStreamBuilder builder,
+                             final File stateDir,
+                             final Serde<?> keySerde,
+                             final Serde<?> valSerde) {
         this(builder, stateDir, keySerde, valSerde, DEFAULT_CACHE_SIZE_BYTES);
     }
 
-    public KStreamTestDriver(KStreamBuilder builder,
-                             File stateDir,
-                             Serde<?> keySerde,
-                             Serde<?> valSerde,
-                             long cacheSize) {
+    public KStreamTestDriver(final KStreamBuilder builder,
+                             final File stateDir,
+                             final Serde<?> keySerde,
+                             final Serde<?> valSerde,
+                             final long cacheSize) {
         builder.setApplicationId("TestDriver");
-        this.topology = builder.build(null);
-        this.globalTopology = builder.buildGlobalStateTopology();
-        ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
-        this.context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
-        this.context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
+        topology = builder.build(null);
+        globalTopology = builder.buildGlobalStateTopology();
+        final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
+        context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
         // init global topology first as it will add stores to the
         // store map that are required for joins etc.
         if (globalTopology != null) {
@@ -85,11 +85,11 @@ public class KStreamTestDriver {
     }
 
     private void initTopology(final ProcessorTopology topology, final List<StateStore> stores) {
-        for (StateStore store : stores) {
+        for (final StateStore store : stores) {
             store.init(context, store);
         }
 
-        for (ProcessorNode node : topology.processors()) {
+        for (final ProcessorNode node : topology.processors()) {
             context.setCurrentNode(node);
             try {
                 node.init(context);
@@ -107,7 +107,7 @@ public class KStreamTestDriver {
         return context;
     }
 
-    public void process(String topicName, Object key, Object value) {
+    public void process(final String topicName, final Object key, final Object value) {
         final ProcessorNode prevNode = context.currentNode();
         ProcessorNode currNode = topology.source(topicName);
         if (currNode == null && globalTopology != null) {
@@ -128,9 +128,9 @@ public class KStreamTestDriver {
         }
     }
 
-    public void punctuate(long timestamp) {
+    public void punctuate(final long timestamp) {
         final ProcessorNode prevNode = context.currentNode();
-        for (ProcessorNode processor : topology.processors()) {
+        for (final ProcessorNode processor : topology.processors()) {
             if (processor.processor() != null) {
                 context.setRecordContext(createRecordContext(timestamp));
                 context.setCurrentNode(processor);
@@ -143,13 +143,13 @@ public class KStreamTestDriver {
         }
     }
 
-    public void setTime(long timestamp) {
+    public void setTime(final long timestamp) {
         context.setTime(timestamp);
     }
 
     public void close() {
         // close all processors
-        for (ProcessorNode node : topology.processors()) {
+        for (final ProcessorNode node : topology.processors()) {
             context.setCurrentNode(node);
             try {
                 node.close();
@@ -162,23 +162,24 @@ public class KStreamTestDriver {
     }
 
     public Set<String> allProcessorNames() {
-        Set<String> names = new HashSet<>();
+        final Set<String> names = new HashSet<>();
 
-        List<ProcessorNode> nodes = topology.processors();
+        final List<ProcessorNode> nodes = topology.processors();
 
-        for (ProcessorNode node: nodes) {
+        for (final ProcessorNode node: nodes) {
             names.add(node.name());
         }
 
         return names;
     }
 
-    public ProcessorNode processor(String name) {
-        List<ProcessorNode> nodes = topology.processors();
+    public ProcessorNode processor(final String name) {
+        final List<ProcessorNode> nodes = topology.processors();
 
-        for (ProcessorNode node: nodes) {
-            if (node.name().equals(name))
+        for (final ProcessorNode node: nodes) {
+            if (node.name().equals(name)) {
                 return node;
+            }
         }
 
         return null;
@@ -189,7 +190,7 @@ public class KStreamTestDriver {
     }
 
     public void flushState() {
-        for (StateStore stateStore : context.allStateStores().values()) {
+        for (final StateStore stateStore : context.allStateStores().values()) {
             stateStore.flush();
         }
     }
@@ -199,12 +200,12 @@ public class KStreamTestDriver {
         // of them since the flushing could cause eviction and hence tries to access other stores
         flushState();
 
-        for (StateStore stateStore : context.allStateStores().values()) {
+        for (final StateStore stateStore : context.allStateStores().values()) {
             stateStore.close();
         }
     }
 
-    private ProcessorRecordContext createRecordContext(long timestamp) {
+    private ProcessorRecordContext createRecordContext(final long timestamp) {
         return new ProcessorRecordContext(timestamp, -1, -1, "topic");
     }
 
@@ -215,25 +216,24 @@ public class KStreamTestDriver {
 
         @Override
         public <K, V> void send(final String topic,
-                                K key,
-                                V value,
-                                Integer partition,
-                                Long timestamp,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valueSerializer,
-                                StreamPartitioner<? super K, ? super V> partitioner) {
+                                final K key,
+                                final V value,
+                                final Long timestamp,
+                                final Serializer<K> keySerializer,
+                                final Serializer<V> valueSerializer,
+                                final StreamPartitioner<? super K, ? super V> partitioner) {
             // The serialization is skipped.
             process(topic, key, value);
         }
 
         @Override
         public <K, V> void send(final String topic,
-                                K key,
-                                V value,
-                                Integer partition,
-                                Long timestamp,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valueSerializer) {
+                                final K key,
+                                final V value,
+                                final Integer partition,
+                                final Long timestamp,
+                                final Serializer<K> keySerializer,
+                                final Serializer<V> valueSerializer) {
         // The serialization is skipped.
             process(topic, key, value);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3bedcce0/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 3349ae1..05175f9 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -28,26 +28,21 @@ public class NoOpRecordCollector implements RecordCollector {
 
     @Override
     public <K, V> void send(final String topic,
-                            K key,
-                            V value,
-                            Integer partition,
-                            Long timestamp,
-                            Serializer<K> keySerializer,
-                            Serializer<V> valueSerializer) {
-    // no-op
-    }
+                            final K key,
+                            final V value,
+                            final Integer partition,
+                            final Long timestamp,
+                            final Serializer<K> keySerializer,
+                            final Serializer<V> valueSerializer) {}
 
     @Override
     public <K, V> void send(final String topic,
-                                K key,
-                                V value,
-                                Integer partition,
-                                Long timestamp,
-                                Serializer<K> keySerializer,
-                                Serializer<V> valueSerializer,
-                                StreamPartitioner<? super K, ? super V> partitioner) {
-        // no-op
-    }
+                            final K key,
+                            final V value,
+                            final Long timestamp,
+                            final Serializer<K> keySerializer,
+                            final Serializer<V> valueSerializer,
+                            final StreamPartitioner<? super K, ? super V> partitioner) {}
 
     @Override
     public void flush() {