You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/06 19:49:57 UTC

kafka git commit: KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask

Repository: kafka
Updated Branches:
  refs/heads/trunk 8fd5b6a66 -> a4592a186


KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask

Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2212 from dguy/standby-task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a4592a18
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a4592a18
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a4592a18

Branch: refs/heads/trunk
Commit: a4592a18641f84a1983c5fe7e697a8b0ab43eb25
Parents: 8fd5b6a
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 6 11:49:54 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 11:49:54 2016 -0800

----------------------------------------------------------------------
 .../processor/internals/RecordCollector.java    | 113 ++---------------
 .../internals/RecordCollectorImpl.java          | 123 +++++++++++++++++++
 .../processor/internals/StandbyContextImpl.java |  27 +++-
 .../streams/processor/internals/StreamTask.java |   4 +-
 .../QueryableStateIntegrationTest.java          |   1 +
 .../internals/RecordCollectorTest.java          |   8 +-
 .../processor/internals/SinkNodeTest.java       |   4 +-
 .../processor/internals/StandbyTaskTest.java    |  31 +++++
 .../streams/state/KeyValueStoreTestDriver.java  |   3 +-
 .../state/internals/RocksDBWindowStoreTest.java |  25 ++--
 .../state/internals/StoreChangeLoggerTest.java  |   4 +-
 .../apache/kafka/test/KStreamTestDriver.java    |   4 +-
 .../apache/kafka/test/MockProcessorContext.java |   6 +-
 .../apache/kafka/test/NoOpRecordCollector.java  |   4 +-
 14 files changed, 226 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 63d6a3b..6d7d561 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -5,126 +5,39 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface RecordCollector {
+    <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
+
+    <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                     StreamPartitioner<K, V> partitioner);
 
-public class RecordCollector {
-    private static final int MAX_SEND_ATTEMPTS = 3;
-    private static final long SEND_RETRY_BACKOFF = 100L;
+    void flush();
+
+    void close();
 
     /**
-     * A supplier of a {@link RecordCollector} instance.
+     * A supplier of a {@link RecordCollectorImpl} instance.
      */
-    public interface Supplier {
+    interface Supplier {
         /**
          * Get the record collector.
          * @return the record collector
          */
         RecordCollector recordCollector();
     }
-
-    private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
-    private final Producer<byte[], byte[]> producer;
-    private final Map<TopicPartition, Long> offsets;
-    private final String logPrefix;
-
-
-    public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
-        this.producer = producer;
-        this.offsets = new HashMap<>();
-        this.logPrefix = String.format("task [%s]", streamTaskId);
-    }
-
-    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
-        send(record, keySerializer, valueSerializer, null);
-    }
-
-    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
-                            StreamPartitioner<K, V> partitioner) {
-        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
-        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
-        Integer partition = record.partition();
-        if (partition == null && partitioner != null) {
-            List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
-            if (partitions != null && partitions.size() > 0)
-                partition = partitioner.partition(record.key(), record.value(), partitions.size());
-        }
-
-        ProducerRecord<byte[], byte[]> serializedRecord =
-                new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
-        final String topic = serializedRecord.topic();
-
-        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
-            try {
-                this.producer.send(serializedRecord, new Callback() {
-                    @Override
-                    public void onCompletion(RecordMetadata metadata, Exception exception) {
-                        if (exception == null) {
-                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                            offsets.put(tp, metadata.offset());
-                        } else {
-                            log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
-                        }
-                    }
-                });
-                return;
-            } catch (TimeoutException e) {
-                if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
-                }
-                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
-                Utils.sleep(SEND_RETRY_BACKOFF);
-            }
-
-        }
-    }
-
-    public void flush() {
-        log.debug("{} Flushing producer", logPrefix);
-        this.producer.flush();
-    }
-
-    /**
-     * Closes this RecordCollector
-     */
-    public void close() {
-        producer.close();
-    }
-
-    /**
-     * The last ack'd offset from the producer
-     *
-     * @return the map from TopicPartition to offset
-     */
-    Map<TopicPartition, Long> offsets() {
-        return this.offsets;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
new file mode 100644
index 0000000..0dbad5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordCollectorImpl implements RecordCollector {
+    private static final int MAX_SEND_ATTEMPTS = 3;
+    private static final long SEND_RETRY_BACKOFF = 100L;
+
+    private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
+
+    private final Producer<byte[], byte[]> producer;
+    private final Map<TopicPartition, Long> offsets;
+    private final String logPrefix;
+
+
+    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+        this.producer = producer;
+        this.offsets = new HashMap<>();
+        this.logPrefix = String.format("task [%s]", streamTaskId);
+    }
+
+    @Override
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        send(record, keySerializer, valueSerializer, null);
+    }
+
+    @Override
+    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+                            StreamPartitioner<K, V> partitioner) {
+        byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
+        byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
+        Integer partition = record.partition();
+        if (partition == null && partitioner != null) {
+            List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
+            if (partitions != null && partitions.size() > 0)
+                partition = partitioner.partition(record.key(), record.value(), partitions.size());
+        }
+
+        ProducerRecord<byte[], byte[]> serializedRecord =
+                new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
+        final String topic = serializedRecord.topic();
+
+        for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+            try {
+                this.producer.send(serializedRecord, new Callback() {
+                    @Override
+                    public void onCompletion(RecordMetadata metadata, Exception exception) {
+                        if (exception == null) {
+                            TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+                            offsets.put(tp, metadata.offset());
+                        } else {
+                            log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+                        }
+                    }
+                });
+                return;
+            } catch (TimeoutException e) {
+                if (attempt == MAX_SEND_ATTEMPTS) {
+                    throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
+                }
+                log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
+                Utils.sleep(SEND_RETRY_BACKOFF);
+            }
+
+        }
+    }
+
+    @Override
+    public void flush() {
+        log.debug("{} Flushing producer", logPrefix);
+        this.producer.flush();
+    }
+
+    /**
+     * Closes this RecordCollector
+     */
+    @Override
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * The last ack'd offset from the producer
+     *
+     * @return the map from TopicPartition to offset
+     */
+    Map<TopicPartition, Long> offsets() {
+        return this.offsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 80c0026..9ce6595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import java.io.File;
@@ -29,6 +32,28 @@ import java.util.Map;
 
 public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
 
+    private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+
+        }
+
+        @Override
+        public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    };
+
     private final TaskId id;
     private final String applicationId;
     private final StreamsMetrics metrics;
@@ -78,7 +103,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
 
     @Override
     public RecordCollector recordCollector() {
-        throw new UnsupportedOperationException();
+        return NO_OP_COLLECTOR;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 0cd8127..a40e1be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -55,7 +55,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
     private final Map<TopicPartition, RecordQueue> partitionQueues;
 
     private final Map<TopicPartition, Long> consumedOffsets;
-    private final RecordCollector recordCollector;
+    private final RecordCollectorImpl recordCollector;
     private final int maxBufferedSize;
 
     private boolean commitRequested = false;
@@ -110,7 +110,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
         this.consumedOffsets = new HashMap<>();
 
         // create the record recordCollector that maintains the produced offsets
-        this.recordCollector = new RecordCollector(producer, id().toString());
+        this.recordCollector = new RecordCollectorImpl(producer, id().toString());
 
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 0c5c79c..02c9fc1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -143,6 +143,7 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration
             .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
 
 
         stringComparator = new Comparator<KeyValue<String, String>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 23abf8a..66397fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -70,7 +70,7 @@ public class RecordCollectorTest {
     @Test
     public void testSpecificPartition() {
 
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestSpecificPartition");
 
@@ -102,7 +102,7 @@ public class RecordCollectorTest {
     @Test
     public void testStreamPartitioner() {
 
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
                 "RecordCollectorTest-TestStreamPartitioner");
 
@@ -129,7 +129,7 @@ public class RecordCollectorTest {
     @Test
     public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
         final AtomicInteger attempt = new AtomicInteger(0);
-        RecordCollector collector = new RecordCollector(
+        RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -149,7 +149,7 @@ public class RecordCollectorTest {
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
-        RecordCollector collector = new RecordCollector(
+        RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
                     public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 51dc7d2..0c00e69 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -37,7 +37,7 @@ public class SinkNodeTest {
         final Serializer anySerializer = Serdes.Bytes().serializer();
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
 
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollector(null, null));
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde,  new RecordCollectorImpl(null, null));
         context.setTime(-1);
 
         final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
@@ -54,7 +54,7 @@ public class SinkNodeTest {
 
         Properties config = new Properties();
         config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollector(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
+        final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(new MockProducer<byte[], byte[]>(true, anySerializer, anySerializer), null));
         context.setTime(0);
 
         final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index afd9bb6..b28b8d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -303,6 +306,34 @@ public class StandbyTaskTest {
 
     }
 
+    @Test
+    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
+        final String changelogName = "test-application-my-store-changelog";
+        final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
+        consumer.assign(partitions);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+        committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
+        consumer.commitSync(committedOffsets);
+
+        restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
+                new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.stream("topic").groupByKey().count("my-store");
+        final ProcessorTopology topology = builder.build(0);
+        StreamsConfig config = createConfig(baseDir);
+        new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() {
+            @Override
+            public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+                return null;
+            }
+
+            @Override
+            public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+
+            }
+        }, stateDirectory);
+
+    }
     private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index aca974b..68a80d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorContext;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -199,7 +200,7 @@ public class KeyValueStoreTestDriver<K, V> {
         ByteArraySerializer rawSerializer = new ByteArraySerializer();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") {
+        this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
             @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index f47bc24..b15ebab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -79,7 +80,7 @@ public class RocksDBWindowStoreTest {
     public void shouldOnlyIterateOpenSegments() throws Exception {
         final File baseDir = TestUtils.tempDirectory();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-        RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+        RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
             }
@@ -126,7 +127,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -200,7 +201,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -289,7 +290,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -376,7 +377,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -432,7 +433,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "anyTaskID") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "anyTaskID") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -460,7 +461,7 @@ public class RocksDBWindowStoreTest {
         try {
             final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRolling") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -575,7 +576,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestore") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -624,7 +625,7 @@ public class RocksDBWindowStoreTest {
         File baseDir2 = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestoreII") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     changeLog.add(new KeyValue<>(
@@ -679,7 +680,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing
@@ -782,7 +783,7 @@ public class RocksDBWindowStoreTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-            RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
+            RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                     // do nothing
@@ -845,7 +846,7 @@ public class RocksDBWindowStoreTest {
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
         final File baseDir = TestUtils.tempDirectory();
         Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
-        RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+        RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index ce4a360..94b5794 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
@@ -42,7 +42,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> written = new HashMap<>();
 
     private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-            new RecordCollector(null, "StoreChangeLoggerTest") {
+            new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
                 public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 14e15a2..f51cc0e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
@@ -246,7 +246,7 @@ public class KStreamTestDriver {
     }
 
 
-    private class MockRecordCollector extends RecordCollector {
+    private class MockRecordCollector extends RecordCollectorImpl {
         public MockRecordCollector() {
             super(null, "KStreamTestDriver");
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cafdd9e..f4ab642 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a4592a18/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 880a93b..d4368d3 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -19,9 +19,9 @@ package org.apache.kafka.test;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 
-public class NoOpRecordCollector extends RecordCollector {
+public class NoOpRecordCollector extends RecordCollectorImpl {
     public NoOpRecordCollector() {
         super(null, "NoOpRecordCollector");
     }