You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/06 20:09:54 UTC

[1/2] kafka git commit: KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 f46032f64 -> abfee8549


KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma, Jun Rao, Jiangjie Qin, Guozhang Wang

Closes #2195 from hachikuji/KAFKA-3994-linked-queue


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02e75a27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02e75a27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02e75a27

Branch: refs/heads/0.10.1
Commit: 02e75a27ce1b2a34a54142cfd3c6bf0931c94c10
Parents: f46032f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Dec 5 11:12:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 11:56:19 2016 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/DelayedHeartbeat.scala    |  5 ++
 .../scala/kafka/coordinator/DelayedJoin.scala   |  8 +-
 .../scala/kafka/server/DelayedOperation.scala   | 93 +++++++++++---------
 .../other/kafka/TestPurgatoryPerformance.scala  |  1 -
 .../kafka/server/DelayedOperationTest.scala     |  4 +-
 5 files changed, 62 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 8e250c3..b05186c 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -29,6 +29,11 @@ private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                             heartbeatDeadline: Long,
                                             sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
+
+  // overridden since tryComplete already synchronizes on the group. This makes it safe to
+  // call purgatory operations while holding the group lock.
+  override def safeTryComplete(): Boolean = tryComplete()
+
   override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
index a62884a..8744f16 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -31,8 +31,12 @@ import kafka.server.DelayedOperation
  */
 private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
                                        group: GroupMetadata,
-                                       sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
+                                       rebalanceTimeout: Long)
+  extends DelayedOperation(rebalanceTimeout) {
+
+  // overridden since tryComplete already synchronizes on the group. This makes it safe to
+  // call purgatory operations while holding the group lock.
+  override def safeTryComplete(): Boolean = tryComplete()
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
   override def onExpiration() = coordinator.onExpireJoin()

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 5248edf..553c27a 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -17,22 +17,18 @@
 
 package kafka.server
 
-import kafka.utils._
-import kafka.utils.timer._
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.metrics.KafkaMetricsGroup
-
-import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection._
-
 import com.yammer.metrics.core.Gauge
 
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
+import kafka.utils.timer._
+
+import scala.collection._
 
 /**
  * An operation whose processing needs to be delayed for at most the given delayMs. For example
@@ -77,7 +73,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
   /**
    * Check if the delayed operation is already completed
    */
-  def isCompleted(): Boolean = completed.get()
+  def isCompleted: Boolean = completed.get()
 
   /**
    * Call-back to execute when a delayed operation gets expired and hence forced to complete.
@@ -90,7 +86,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
    */
   def onComplete(): Unit
 
-  /*
+  /**
    * Try to complete the delayed operation by first checking if the operation
    * can be completed by now. If yes execute the completion logic by calling
    * forceComplete() and return true iff forceComplete returns true; otherwise return false
@@ -99,6 +95,16 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
    */
   def tryComplete(): Boolean
 
+  /**
+   * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its
+   * own synchronization.
+   */
+  def safeTryComplete(): Boolean = {
+    synchronized {
+      tryComplete()
+    }
+  }
+
   /*
    * run() method defines a task that is executed on timeout
    */
@@ -187,14 +193,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
     // operation is unnecessarily added for watch. However, this is a less severe issue since the
     // expire reaper will clean it up periodically.
 
-    var isCompletedByMe = operation synchronized operation.tryComplete()
+    var isCompletedByMe = operation.safeTryComplete()
     if (isCompletedByMe)
       return true
 
     var watchCreated = false
     for(key <- watchKeys) {
       // If the operation is already completed, stop adding it to the rest of the watcher list.
-      if (operation.isCompleted())
+      if (operation.isCompleted)
         return false
       watchForOperation(key, operation)
 
@@ -204,14 +210,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
       }
     }
 
-    isCompletedByMe = operation synchronized operation.tryComplete()
+    isCompletedByMe = operation.safeTryComplete()
     if (isCompletedByMe)
       return true
 
     // if it cannot be completed by now and hence is watched, add to the expire queue also
-    if (! operation.isCompleted()) {
+    if (!operation.isCompleted) {
       timeoutTimer.add(operation)
-      if (operation.isCompleted()) {
+      if (operation.isCompleted) {
         // cancel the timer task
         operation.cancel()
       }
@@ -239,7 +245,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
    * on multiple lists, and some of its watched entries may still be in the watch lists
    * even when it has been completed, this number may be larger than the number of real operations watched
    */
-  def watched() = allWatchers.map(_.watched).sum
+  def watched() = allWatchers.map(_.countWatched).sum
 
   /**
    * Return the number of delayed operations in the expiry queue
@@ -272,7 +278,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
       if (watchersForKey.get(key) != watchers)
         return
 
-      if (watchers != null && watchers.watched == 0) {
+      if (watchers != null && watchers.isEmpty) {
         watchersForKey.remove(key)
       }
     }
@@ -291,35 +297,35 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
    * A linked list of watched delayed operations based on some key
    */
   private class Watchers(val key: Any) {
+    private[this] val operations = new ConcurrentLinkedQueue[T]()
 
-    private[this] val operations = new LinkedList[T]()
+    // count the current number of watched operations. This is O(n), so use isEmpty() if possible
+    def countWatched: Int = operations.size
 
-    def watched: Int = operations synchronized operations.size
+    def isEmpty: Boolean = operations.isEmpty
 
     // add the element to watch
     def watch(t: T) {
-      operations synchronized operations.add(t)
+      operations.add(t)
     }
 
     // traverse the list and try to complete some watched elements
     def tryCompleteWatched(): Int = {
-
       var completed = 0
-      operations synchronized {
-        val iter = operations.iterator()
-        while (iter.hasNext) {
-          val curr = iter.next()
-          if (curr.isCompleted) {
-            // another thread has completed this operation, just remove it
-            iter.remove()
-          } else if (curr synchronized curr.tryComplete()) {
-            completed += 1
-            iter.remove()
-          }
+
+      val iter = operations.iterator()
+      while (iter.hasNext) {
+        val curr = iter.next()
+        if (curr.isCompleted) {
+          // another thread has completed this operation, just remove it
+          iter.remove()
+        } else if (curr.safeTryComplete()) {
+          iter.remove()
+          completed += 1
         }
       }
 
-      if (operations.size == 0)
+      if (operations.isEmpty)
         removeKeyIfEmpty(key, this)
 
       completed
@@ -328,18 +334,17 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
     // traverse the list and purge elements that are already completed by others
     def purgeCompleted(): Int = {
       var purged = 0
-      operations synchronized {
-        val iter = operations.iterator()
-        while (iter.hasNext) {
-          val curr = iter.next()
-          if (curr.isCompleted) {
-            iter.remove()
-            purged += 1
-          }
+
+      val iter = operations.iterator()
+      while (iter.hasNext) {
+        val curr = iter.next()
+        if (curr.isCompleted) {
+          iter.remove()
+          purged += 1
         }
       }
 
-      if (operations.size == 0)
+      if (operations.isEmpty)
         removeKeyIfEmpty(key, this)
 
       purged

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..addd232 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -237,7 +237,6 @@ object TestPurgatoryPerformance {
   }
 
   private class FakeOperation(delayMs: Long, size: Int, val latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) {
-    private[this] val data = new Array[Byte](size)
     val completesAt = System.currentTimeMillis + latencyMs
 
     def onExpiration(): Unit = {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index ae0d12f..dccde2f 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -62,8 +62,8 @@ class DelayedOperationTest {
     assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
     r1.awaitExpiration()
     val elapsed = SystemTime.hiResClockMs - start
-    assertTrue("r1 completed due to expiration", r1.isCompleted())
-    assertFalse("r2 hasn't completed", r2.isCompleted())
+    assertTrue("r1 completed due to expiration", r1.isCompleted)
+    assertFalse("r2 hasn't completed", r2.isCompleted)
     assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration)
   }
 


[2/2] kafka git commit: KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask

Posted by gu...@apache.org.
KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask

Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2212 from dguy/standby-task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abfee854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abfee854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abfee854

Branch: refs/heads/0.10.1
Commit: abfee8549b920ca88b008b59f651324ce0b0c05e
Parents: 02e75a2
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 6 11:49:54 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 12:09:34 2016 -0800

----------------------------------------------------------------------
 .../processor/internals/RecordCollector.java    | 113 ++---------------
 .../internals/RecordCollectorImpl.java          | 123 +++++++++++++++++++
 .../processor/internals/StandbyContextImpl.java |  27 +++-
 .../streams/processor/internals/StreamTask.java |   4 +-
 .../QueryableStateIntegrationTest.java          |   1 +
 .../internals/RecordCollectorTest.java          |   8 +-
 .../processor/internals/SinkNodeTest.java       | 117 ++----------------
 .../processor/internals/StandbyTaskTest.java    |  31 +++++
 .../streams/state/KeyValueStoreTestDriver.java  |   3 +-
 .../state/internals/RocksDBWindowStoreTest.java |  25 ++--
 .../state/internals/StoreChangeLoggerTest.java  |   4 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 .../apache/kafka/test/MockProcessorContext.java |   6 +-
 .../apache/kafka/test/NoOpRecordCollector.java  |   4 +-
 14 files changed, 232 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 63d6a3b..6d7d561 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -5,126 +5,39 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface RecordCollector {
+    <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
+
+    <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                     StreamPartitioner<K, V> partitioner);
 
-public class RecordCollector {
-    private static final int MAX_SEND_ATTEMPTS = 3;
-    private static final long SEND_RETRY_BACKOFF = 100L;
+    void flush();
+
+    void close();
 
     /**
-     * A supplier of a {@link RecordCollector} instance.
+     * A supplier of a {@link RecordCollectorImpl} instance.
      */
-    public interface Supplier {
+    interface Supplier {
         /**
          * Get the record collector.
          * @return the record collector
          */
         RecordCollector recordCollector();
     }
-
-    private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
-    private final Producer<byte[], byte[]> producer;
-    private final Map<TopicPartition, Long> offsets;
-    private final String logPrefix;
-
-
-    public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
-        this.producer = producer;
-        this.offsets = new HashMap<>();
-        this.logPrefix = String.format("task [%s]", streamTaskId);
-    }
-
-    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        send(record, keySerializer, valueSerializer, null);
-    }
-
-    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamPartitioner<K, V> partitioner) {
-        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
-        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
-        Integer partition = record.partition();
-        if (partition == null && partitioner != null) {
-            List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
-            if (partitions != null && partitions.size() > 0)
-                partition = partitioner.partition(record.key(), record.value(), partitions.size());
-        }
-
-        ProducerRecord<byte[], byte[]> serializedRecord =
-                new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
-        final String topic = serializedRecord.topic();
-
-        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
-            try {
-                this.producer.send(serializedRecord, new Callback() {
-                    @Override
-                    public void onCompletion(RecordMetadata metadata, Exception exception) {
-                        if (exception == null) {
-                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                            offsets.put(tp, metadata.offset());
-                        } else {
-                            log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
-                        }
-                    }
-                });
-                return;
-            } catch (TimeoutException e) {
-                if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
-                }
-                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
-                Utils.sleep(SEND_RETRY_BACKOFF);
-            }
-
-        }
-    }
-
-    public void flush() {
-        log.debug("{} Flushing producer", logPrefix);
-        this.producer.flush();
-    }
-
-    /**
-     * Closes this RecordCollector
-     */
-    public void close() {
-        producer.close();
-    }
-
-    /**
-     * The last ack'd offset from the producer
-     *
-     * @return the map from TopicPartition to offset
-     */
-    Map<TopicPartition, Long> offsets() {
-        return this.offsets;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
new file mode 100644
index 0000000..0dbad5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordCollectorImpl implements RecordCollector {
+    private static final int MAX_SEND_ATTEMPTS = 3;
+    private static final long SEND_RETRY_BACKOFF = 100L;
+
+    private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
+
+    private final Producer<byte[], byte[]> producer;
+    private final Map<TopicPartition, Long> offsets;
+    private final String logPrefix;
+
+
+    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+        this.producer = producer;
+        this.offsets = new HashMap<>();
+        this.logPrefix = String.format("task [%s]", streamTaskId);
+    }
+
+    @Override
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        send(record, keySerializer, valueSerializer, null);
+    }
+
+    @Override
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                            StreamPartitioner<K, V> partitioner) {
+        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
+        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
+        Integer partition = record.partition();
+        if (partition == null && partitioner != null) {
+            List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
+            if (partitions != null && partitions.size() > 0)
+                partition = partitioner.partition(record.key(), record.value(), partitions.size());
+        }
+
+        ProducerRecord<byte[], byte[]> serializedRecord =
+                new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
+        final String topic = serializedRecord.topic();
+
+        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+            try {
+                this.producer.send(serializedRecord, new Callback() {
+                    @Override
+                    public void onCompletion(RecordMetadata metadata, Exception exception) {
+                        if (exception == null) {
+                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                            offsets.put(tp, metadata.offset());
+                        } else {
+                            log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+                        }
+                    }
+                });
+                return;
+            } catch (TimeoutException e) {
+                if (attempt == MAX_SEND_ATTEMPTS) {
+                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
+                }
+                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
+                Utils.sleep(SEND_RETRY_BACKOFF);
+            }
+
+        }
+    }
+
+    @Override
+    public void flush() {
+        log.debug("{} Flushing producer", logPrefix);
+        this.producer.flush();
+    }
+
+    /**
+     * Closes this RecordCollector
+     */
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * The last ack'd offset from the producer
+     *
+     * @return the map from TopicPartition to offset
+     */
+    Map<TopicPartition, Long> offsets() {
+        return this.offsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 80c0026..9ce6595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
@@ -29,6 +32,28 @@ import java.util.Map;
 
 public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
 
+    private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+
+        }
+
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    };
+
     private final TaskId id;
     private final String applicationId;
     private final StreamsMetrics metrics;
@@ -78,7 +103,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
 
     @Override
     public RecordCollector recordCollector() {
-        throw new UnsupportedOperationException();
+        return NO_OP_COLLECTOR;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 9a2f03e..6651705 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -54,7 +54,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private final PunctuationQueue punctuationQueue;
 
     private final Map<TopicPartition, Long> consumedOffsets;
-    private final RecordCollector recordCollector;
+    private final RecordCollectorImpl recordCollector;
     private final int maxBufferedSize;
 
     private boolean commitRequested = false;
@@ -109,7 +109,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = new RecordCollector(producer, id().toString());
+        this.recordCollector = new RecordCollectorImpl(producer, id().toString());
 
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 66b6d2e..7df2798 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -144,6 +144,7 @@ public class QueryableStateIntegrationTest {
             .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 
 
         stringComparator = new Comparator<KeyValue<String, String>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 23abf8a..66397fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -70,7 +70,7 @@ public class RecordCollectorTest {
     @Test
     public void testSpecificPartition() {
 
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestSpecificPartition");
 
@@ -102,7 +102,7 @@ public class RecordCollectorTest {
     @Test
     public void testStreamPartitioner() {
 
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestStreamPartitioner");
 
@@ -129,7 +129,7 @@ public class RecordCollectorTest {
     @Test
     public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
         final AtomicInteger attempt = new AtomicInteger(0);
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -149,7 +149,7 @@ public class RecordCollectorTest {
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
-        RecordCollector collector = new RecordCollector(
+        RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 3b41517..8ae250c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,129 +17,28 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.Map;
-
 public class SinkNodeTest {
 
     @Test(expected = StreamsException.class)
     @SuppressWarnings("unchecked")
     public void invalidInputRecordTimestampTest() {
         final Serializer anySerializer = Serdes.Bytes().serializer();
+        final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null));
+        context.setTime(-1);
 
         final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
-        sink.init(new MockProcessorContext());
+        sink.init(context);
 
         sink.process(null, null);
     }
-
-    private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
-        private final long invalidTimestamp = -1;
-
-        @Override
-        public String applicationId() {
-            return null;
-        }
-
-        @Override
-        public TaskId taskId() {
-            return null;
-        }
-
-        @Override
-        public Serde<?> keySerde() {
-            return null;
-        }
-
-        @Override
-        public Serde<?> valueSerde() {
-            return null;
-        }
-
-        @Override
-        public File stateDir() {
-            return null;
-        }
-
-        @Override
-        public StreamsMetrics metrics() {
-            return null;
-        }
-
-        @Override
-        public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
-        }
-
-        @Override
-        public StateStore getStateStore(String name) {
-            return null;
-        }
-
-        @Override
-        public void schedule(long interval) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value, int childIndex) {
-        }
-
-        @Override
-        public <K, V> void forward(K key, V value, String childName) {
-        }
-
-        @Override
-        public void commit() {
-        }
-
-        @Override
-        public String topic() {
-            return null;
-        }
-
-        @Override
-        public int partition() {
-            return 0;
-        }
-
-        @Override
-        public long offset() {
-            return 0;
-        }
-
-        @Override
-        public long timestamp() {
-            return invalidTimestamp;
-        }
-
-        @Override
-        public Map<String, Object> appConfigs() {
-            return null;
-        }
-
-        @Override
-        public Map<String, Object> appConfigsWithPrefix(String prefix) {
-            return null;
-        }
-
-        @Override
-        public RecordCollector recordCollector() {
-            return null;
-        }
-    }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index afd9bb6..b28b8d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -303,6 +306,34 @@ public class StandbyTaskTest {
 
     }
 
+    @Test
+    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
+        final String changelogName = "test-application-my-store-changelog";
+        final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
+        consumer.assign(partitions);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
+                new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.stream("topic").groupByKey().count("my-store");
+        final ProcessorTopology topology = builder.build(0);
+        StreamsConfig config = createConfig(baseDir);
+        new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() {
+            @Override
+            public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+                return null;
+            }
+
+            @Override
+            public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+
+            }
+        }, stateDirectory);
+
+    }
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index aca974b..68a80d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -199,7 +200,7 @@ public class KeyValueStoreTestDriver<K, V> {
         ByteArraySerializer rawSerializer = new ByteArraySerializer();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") {
+        this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
             @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index f47bc24..b15ebab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -79,7 +80,7 @@ public class RocksDBWindowStoreTest {
     public void shouldOnlyIterateOpenSegments() throws Exception {
         final File baseDir = TestUtils.tempDirectory();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-        RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+        RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
             }
@@ -126,7 +127,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -200,7 +201,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -289,7 +290,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -376,7 +377,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -432,7 +433,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "anyTaskID") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "anyTaskID") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -460,7 +461,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRolling") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -575,7 +576,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -624,7 +625,7 @@ public class RocksDBWindowStoreTest {
         File baseDir2 = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestoreII") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -679,7 +680,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing
@@ -782,7 +783,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing
@@ -845,7 +846,7 @@ public class RocksDBWindowStoreTest {
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
         final File baseDir = TestUtils.tempDirectory();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-        RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+        RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index fbfffb9..9189a14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
@@ -41,7 +41,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-            new RecordCollector(null, "StoreChangeLoggerTest") {
+            new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 14e15a2..f51cc0e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
@@ -246,7 +246,7 @@ public class KStreamTestDriver {
     }
 
 
-    private class MockRecordCollector extends RecordCollector {
+    private class MockRecordCollector extends RecordCollectorImpl {
         public MockRecordCollector() {
             super(null, "KStreamTestDriver");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cafdd9e..f4ab642 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 880a93b..d4368d3 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -19,9 +19,9 @@ package org.apache.kafka.test;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 
-public class NoOpRecordCollector extends RecordCollector {
+public class NoOpRecordCollector extends RecordCollectorImpl {
     public NoOpRecordCollector() {
         super(null, "NoOpRecordCollector");
     }