You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/07 23:46:08 UTC

kafka git commit: KAFKA-2649: Add support for custom partitioning in topology sinks

Repository: kafka
Updated Branches:
  refs/heads/trunk ee1770e00 -> 4836e525c


KAFKA-2649: Add support for custom partitioning in topology sinks

Added option to use custom partitioning logic within each topology sink.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Guozhang Wang

Closes #309 from rhauch/kafka-2649


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4836e525
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4836e525
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4836e525

Branch: refs/heads/trunk
Commit: 4836e525c851640e0da9d8d11321621a2c70e8f0
Parents: ee1770e
Author: Randall Hauch <rh...@gmail.com>
Authored: Thu Jan 7 14:46:02 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jan 7 14:46:02 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/StreamPartitioner.java    | 59 ++++++++++++++++
 .../streams/processor/TopologyBuilder.java      | 73 +++++++++++++++++++-
 .../processor/internals/RecordCollector.java    | 17 ++++-
 .../streams/processor/internals/SinkNode.java   |  7 +-
 .../internals/ProcessorTopologyTest.java        | 45 ++++++++----
 .../streams/state/KeyValueStoreTestDriver.java  |  6 ++
 .../apache/kafka/test/KStreamTestDriver.java    | 11 +++
 .../kafka/test/ProcessorTopologyTestDriver.java |  7 +-
 8 files changed, 203 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
new file mode 100644
index 0000000..f14d9d9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+/**
+ * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's
+ * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition.
+ * <p>
+ * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so
+ * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you
+ * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics.
+ * <p>
+ * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only
+ * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will
+ * automatically manage these instances, and adjust when new topology instances are added or removed.
+ * <p>
+ * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have
+ * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance.
+ * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently
+ * determine to which partition each message should be written.
+ * <p>
+ * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner
+ * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink}
+ * for that topic.
+ * <p>
+ * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes.
+ * 
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer,
+ *      org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...)
+ * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...)
+ */
+public interface StreamPartitioner<K, V> {
+
+    /**
+     * Determine the partition number for a message with the given key and value and the current number of partitions.
+     * 
+     * @param key the key of the message
+     * @param value the value of the message
+     * @param numPartitions the total number of partitions
+     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+     */
+    Integer partition(K key, V value, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 3cfb22b..9cd80a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -132,19 +132,21 @@ public class TopologyBuilder {
         public final String topic;
         private Serializer keySerializer;
         private Serializer valSerializer;
+        private final StreamPartitioner partitioner;
 
-        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer) {
+        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
             super(name);
             this.parents = parents.clone();
             this.topic = topic;
             this.keySerializer = keySerializer;
             this.valSerializer = valSerializer;
+            this.partitioner = partitioner;
         }
 
         @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
-            return new SinkNode(name, topic, keySerializer, valSerializer);
+            return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
         }
     }
 
@@ -235,15 +237,53 @@ public class TopologyBuilder {
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
+     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * and write to its topic
      * @return this builder instance so methods can be chained together; never null
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, String... parentNames) {
         return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames);
     }
 
     /**
+     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic, using
+     * the supplied partitioner.
+     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamingConfig streaming configuration}.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * the named Kafka topic's partitions. Such control is often useful with topologies that use
+     * {@link #addStateStore(StateStoreSupplier, String...) state stores}
+     * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
+     * messages among partitions using Kafka's default partitioning logic.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its messages
+     * @param partitioner the function that should be used to determine the partition for each message processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * and write to its topic
+     * @return this builder instance so methods can be chained together; never null
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
+     */
+    public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) {
+        return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames);
+    }
+
+    /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
      * The sink will use the specified key and value serializers.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among
+     * the named Kafka topic's partitions. Such control is often useful with topologies that use
+     * {@link #addStateStore(StateStoreSupplier, String...) state stores}
+     * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute
+     * messages among partitions using Kafka's default partitioning logic.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
@@ -256,8 +296,35 @@ public class TopologyBuilder {
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...)
      */
     public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) {
+        return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames);
+    }
+
+    /**
+     * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
+     * The sink will use the specified key and value serializers, and the supplied partitioner.
+     *
+     * @param name the unique name of the sink
+     * @param topic the name of the Kafka topic to which this sink should write its messages
+     * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
+     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
+     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link StreamingConfig streaming configuration}
+     * @param partitioner the function that should be used to determine the partition for each message processed by the sink
+     * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
+     * and write to its topic
+     * @return this builder instance so methods can be chained together; never null
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     */
+    public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) {
         if (nodeFactories.containsKey(name))
             throw new TopologyException("Processor " + name + " is already added.");
 
@@ -272,7 +339,7 @@ public class TopologyBuilder {
             }
         }
 
-        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer));
+        nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer, partitioner));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, parentNames);
         return this;

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index f0dbf35..fe0472e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -21,12 +21,15 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class RecordCollector {
@@ -47,6 +50,7 @@ public class RecordCollector {
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final Callback callback = new Callback() {
+        @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) {
             if (exception == null) {
                 TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
@@ -64,9 +68,20 @@ public class RecordCollector {
     }
 
     public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        send(record, keySerializer, valueSerializer, null);
+    }
+
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                            StreamPartitioner<K, V> partitioner) {
         byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
         byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
-        this.producer.send(new ProducerRecord<>(record.topic(), keyBytes, valBytes), callback);
+        Integer partition = null;
+        if (partitioner != null) {
+            List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
+            if (partitions != null)
+                partition = partitioner.partition(record.key(), record.value(), partitions.size());
+        }
+        this.producer.send(new ProducerRecord<>(record.topic(), partition, keyBytes, valBytes), callback);
     }
 
     public void flush() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 9f01727..7ab59ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -20,21 +20,24 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
+    private final StreamPartitioner<K, V> partitioner;
 
     private ProcessorContext context;
 
-    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
+    public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) {
         super(name);
 
         this.topic = topic;
         this.keySerializer = keySerializer;
         this.valSerializer = valSerializer;
+        this.partitioner = partitioner;
     }
 
     @Override
@@ -54,7 +57,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
     public void process(K key, V value) {
         // send to all the registered topics
         RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer);
+        collector.send(new ProducerRecord<>(topic, key, value), keySerializer, valSerializer, partitioner);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 2f359bc..f2ef2ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -114,22 +115,23 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testDrivingSimpleTopology() {
-        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology());
+        int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition));
         driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
         driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
         driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
         driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4");
-        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5", partition);
     }
 
     @Test
@@ -172,25 +174,38 @@ public class ProcessorTopologyTest {
     }
 
     protected void assertNextOutputRecord(String topic, String key, String value) {
-        assertProducerRecord(driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER), topic, key, value);
+        ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
+        assertEquals(topic, record.topic());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertNull(record.partition());
+    }
+
+    protected void assertNextOutputRecord(String topic, String key, String value, Integer partition) {
+        ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER);
+        assertEquals(topic, record.topic());
+        assertEquals(key, record.key());
+        assertEquals(value, record.value());
+        assertEquals(partition, record.partition());
     }
 
     protected void assertNoOutputRecord(String topic) {
         assertNull(driver.readOutput(topic));
     }
 
-    private void assertProducerRecord(ProducerRecord<String, String> record, String topic, String key, String value) {
-        assertEquals(topic, record.topic());
-        assertEquals(key, record.key());
-        assertEquals(value, record.value());
-        // Kafka Streaming doesn't set the partition, so it's always null
-        assertNull(record.partition());
+    protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) {
+        return new StreamPartitioner<K, V>() {
+            @Override
+            public Integer partition(K key, V value, int numPartitions) {
+                return partition;
+            }
+        };
     }
 
-    protected TopologyBuilder createSimpleTopology() {
+    protected TopologyBuilder createSimpleTopology(int partition) {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new ForwardingProcessor()), "source")
-                                    .addSink("sink", OUTPUT_TOPIC_1, "processor");
+                                    .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor");
     }
 
     protected TopologyBuilder createMultiplexingTopology() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 6f74da8..108797a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.test.MockProcessorContext;
@@ -245,6 +246,11 @@ public class KeyValueStoreTestDriver<K, V> {
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                 recordFlushed(record.key(), record.value());
             }
+            @Override
+            public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
+                                    StreamPartitioner<K1, V1> partitioner) {
+                recordFlushed(record.key(), record.value());
+            }
         };
         this.stateDir = StateUtils.tempDir();
         this.stateDir.mkdirs();

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 5275545..2dc567e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -126,15 +127,25 @@ public class KStreamTestDriver {
         public MockRecordCollector() {
             super(null);
         }
+        
+        @Override
+        public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                                StreamPartitioner<K, V> partitioner) {
+            // The serialization is skipped.
+            process(record.topic(), record.key(), record.value());
+        }
 
+        @Override
         public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
             // The serialization is skipped.
             process(record.topic(), record.key(), record.value());
         }
 
+        @Override
         public void flush() {
         }
 
+        @Override
         public void close() {
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4836e525/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 879c172..eaeed09 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -149,7 +149,12 @@ public class ProcessorTopologyTestDriver {
 
         // Set up the consumer and producer ...
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        producer = new MockProducer<>(true, bytesSerializer, bytesSerializer);
+        producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
+            @Override
+            public List<PartitionInfo> partitionsFor(String topic) {
+                return Collections.emptyList();
+            }
+        };
         restoreStateConsumer = createRestoreConsumer(id, storeNames);
 
         // Set up all of the topic+partition information and subscribe the consumer to each ...