You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/16 22:28:24 UTC

kafka git commit: KAFKA-2811: add standby tasks

Repository: kafka
Updated Branches:
  refs/heads/trunk 356544cab -> 45e7f7130


KAFKA-2811: add standby tasks

guozhangwang
* added a new config param "num.standby.replicas" (the default value is 0).
* added a new abstract class AbstractTask
* added StandbyTask as a subclass of AbstractTask
* modified StreamTask to a subclass of AbstractTask
* StreamThread
  * standby tasks are created by calling StreamThread.addStandbyTask() from onPartitionsAssigned()
  * standby tasks are destroyed by calling StreamThread.removeStandbyTasks() from onPartitionRevoked()
  * In addStandbyTasks(), change log partitions are assigned to restoreConsumer.
  * In removeStandByTasks(), change log partitions are removed from restoreConsumer.
  * StreamThread polls change log records using restoreConsumer in the runLoop with timeout=0.
  * If records are returned, StreamThread calls StandbyTask.update and pass records to each standby tasks.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #526 from ymatsuda/standby_task


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45e7f713
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45e7f713
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45e7f713

Branch: refs/heads/trunk
Commit: 45e7f71309f9a3e30d25a6ddd3171c67e3e79286
Parents: 356544c
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Nov 16 13:34:42 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 16 13:34:42 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/StreamingConfig.java   |  10 +
 .../processor/internals/AbstractTask.java       |  93 +++++++++
 .../KafkaStreamingPartitionAssignor.java        |   6 +-
 .../internals/ProcessorStateManager.java        | 103 +++++++---
 .../processor/internals/StandbyContextImpl.java | 164 ++++++++++++++++
 .../processor/internals/StandbyTask.java        |  88 +++++++++
 .../streams/processor/internals/StreamTask.java |  56 +-----
 .../processor/internals/StreamThread.java       | 143 ++++++++++----
 .../KafkaStreamingPartitionAssignorTest.java    | 174 +++++++++++++----
 .../internals/ProcessorStateManagerTest.java    |  93 +++++++--
 .../processor/internals/StandbyTaskTest.java    | 190 +++++++++++++++++++
 .../processor/internals/StreamThreadTest.java   |  12 +-
 12 files changed, 951 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index 693cb0c..f563070 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -54,6 +54,10 @@ public class StreamingConfig extends AbstractConfig {
     public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
     private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
 
+    /** <code>num.stream.threads</code> */
+    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
     /** <code>buffered.records.per.partition</code> */
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
     private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
@@ -136,6 +140,11 @@ public class StreamingConfig extends AbstractConfig {
                                         1,
                                         Importance.LOW,
                                         NUM_STREAM_THREADS_DOC)
+                                .define(NUM_STANDBY_REPLICAS_CONFIG,
+                                        Type.INT,
+                                        0,
+                                        Importance.LOW,
+                                        NUM_STANDBY_REPLICAS_DOC)
                                 .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
                                         Type.INT,
                                         1000,
@@ -214,6 +223,7 @@ public class StreamingConfig extends AbstractConfig {
 
     public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
         Map<String, Object> props = getConsumerConfigs();
+        props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
         props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
         return props;

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
new file mode 100644
index 0000000..64bb10d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+public abstract class AbstractTask {
+    protected final TaskId id;
+    protected final ProcessorTopology topology;
+    protected final ProcessorStateManager stateMgr;
+    protected final Set<TopicPartition> partitions;
+    protected ProcessorContext processorContext;
+
+    protected AbstractTask(TaskId id,
+                           Consumer<byte[], byte[]> restoreConsumer,
+                           ProcessorTopology topology,
+                           StreamingConfig config,
+                           Set<TopicPartition> partitions) {
+        this.id = id;
+        this.topology = topology;
+        this.partitions = partitions;
+
+        // create the processor state manager
+        try {
+            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
+            // if partitions is null, this is a standby task
+            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, partitions == null);
+        } catch (IOException e) {
+            throw new KafkaException("Error while creating the state manager", e);
+        }
+    }
+
+    protected void initializeStateStores() {
+        for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
+            StateStore store = stateStoreSupplier.get();
+            store.init(this.processorContext);
+        }
+    }
+
+    public final TaskId id() {
+        return id;
+    }
+
+    public final Set<TopicPartition> partitions() {
+        return this.partitions;
+    }
+
+    public final ProcessorTopology topology() {
+        return topology;
+    }
+
+    public final ProcessorContext context() {
+        return processorContext;
+    }
+
+    public abstract void commit();
+
+    public void close() {
+        try {
+            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+        } catch (IOException e) {
+            throw new KafkaException("Error while closing the state manager in processor context", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 35ba0ec..451b214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -46,11 +46,14 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
 
     private StreamThread streamThread;
+    private int numStandbyReplicas;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
     private Set<TaskId> standbyTasks;
 
     @Override
     public void configure(Map<String, ?> configs) {
+        numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
         Object o = configs.get(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -99,7 +102,6 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         //    - We try not to assign the same set of tasks to two different clients
         //    We do the assignment in one-pass. The result may not satisfy above all.
         // 2. within each client, tasks are assigned to consumer clients in round-robin manner.
-
         Map<UUID, Set<String>> consumersByClient = new HashMap<>();
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
 
@@ -132,7 +134,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         // Get partition groups from the partition grouper
         Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);
 
-        states = TaskAssignor.assign(states, partitionGroups.keySet(), 0); // TODO: enable standby tasks
+        states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
         Map<String, Assignment> assignment = new HashMap<>();
 
         for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3cb9cea..2a8df9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -35,6 +35,7 @@ import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class ProcessorStateManager {
@@ -51,13 +52,17 @@ public class ProcessorStateManager {
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
+    private final boolean isStandby;
+    private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
 
-    public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer) throws IOException {
+    public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
         this.partition = partition;
         this.baseDir = baseDir;
         this.stores = new HashMap<>();
         this.restoreConsumer = restoreConsumer;
         this.restoredOffsets = new HashMap<>();
+        this.isStandby = isStandby;
+        this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
 
         // create the state directory for this task if missing (we won't create the parent directory)
         createStateDirectory(baseDir);
@@ -103,8 +108,6 @@ public class ProcessorStateManager {
         if (this.stores.containsKey(store.name()))
             throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
 
-        // ---- register the store ---- //
-
         // check that the underlying change log topic exist or not
         if (restoreConsumer.listTopics().containsKey(store.name())) {
             boolean partitionNotFound = true;
@@ -124,48 +127,91 @@ public class ProcessorStateManager {
 
         this.stores.put(store.name(), store);
 
+        if (isStandby) {
+            if (store.persistent())
+                restoreCallbacks.put(store.name(), stateRestoreCallback);
+        } else {
+            restoreActiveState(store, stateRestoreCallback);
+        }
+    }
+
+    private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {
+
+        if (store == null)
+            throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");
+
         // ---- try to restore the state from change-log ---- //
 
         // subscribe to the store's partition
-        TopicPartition storePartition = new TopicPartition(store.name(), partition);
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
         }
+        TopicPartition storePartition = new TopicPartition(store.name(), partition);
         restoreConsumer.assign(Collections.singletonList(storePartition));
 
-        // calculate the end offset of the partition
-        // TODO: this is a bit hacky to first seek then position to get the end offset
-        restoreConsumer.seekToEnd(storePartition);
-        long endOffset = restoreConsumer.position(storePartition);
+        try {
+            // calculate the end offset of the partition
+            // TODO: this is a bit hacky to first seek then position to get the end offset
+            restoreConsumer.seekToEnd(storePartition);
+            long endOffset = restoreConsumer.position(storePartition);
+
+            // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
+            // restore the state from the beginning of the change log otherwise
+            if (checkpointedOffsets.containsKey(storePartition)) {
+                restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
+            } else {
+                restoreConsumer.seekToBeginning(storePartition);
+            }
 
-        // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
-        // restore the state from the beginning of the change log otherwise
-        if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
-            restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
-        } else {
-            restoreConsumer.seekToBeginning(storePartition);
-        }
+            // restore its state from changelog records; while restoring the log end offset
+            // should not change since it is only written by this thread.
+            while (true) {
+                for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
+                    stateRestoreCallback.restore(record.key(), record.value());
+                }
 
-        // restore its state from changelog records; while restoring the log end offset
-        // should not change since it is only written by this thread.
-        while (true) {
-            for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
-                stateRestoreCallback.restore(record.key(), record.value());
+                if (restoreConsumer.position(storePartition) == endOffset) {
+                    break;
+                } else if (restoreConsumer.position(storePartition) > endOffset) {
+                    throw new IllegalStateException("Log end offset should not change while restoring");
+                }
             }
 
-            if (restoreConsumer.position(storePartition) == endOffset) {
-                break;
-            } else if (restoreConsumer.position(storePartition) > endOffset) {
-                throw new IllegalStateException("Log end offset should not change while restoring");
+            // record the restored offset for its change log partition
+            long newOffset = restoreConsumer.position(storePartition);
+            restoredOffsets.put(storePartition, newOffset);
+        } finally {
+            // un-assign the change log partition
+            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+        }
+    }
+
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
+
+        for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
+            String storeName = entry.getKey();
+            TopicPartition storePartition = new TopicPartition(storeName, partition);
+
+            if (checkpointedOffsets.containsKey(storePartition)) {
+                partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
+            } else {
+                partitionsAndOffsets.put(storePartition, -1L);
             }
         }
+        return partitionsAndOffsets;
+    }
+
+    public void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
+        // restore states from changelog records
+        StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
 
+        for (ConsumerRecord<byte[], byte[]> record : records) {
+            restoreCallback.restore(record.key(), record.value());
+        }
         // record the restored offset for its change log partition
         long newOffset = restoreConsumer.position(storePartition);
         restoredOffsets.put(storePartition, newOffset);
-
-        // un-assign the change log partition
-        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
     }
 
     public StateStore getStore(String name) {
@@ -224,6 +270,9 @@ public class ProcessorStateManager {
             checkpoint.write(checkpointOffsets);
         }
 
+        // un-assign the change log partition
+        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+
         // release the state directory directoryLock
         directoryLock.release();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
new file mode 100644
index 0000000..ea95300
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
+
+    private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
+
+    private final TaskId id;
+    private final StreamingMetrics metrics;
+    private final ProcessorStateManager stateMgr;
+
+    private final Serializer<?> keySerializer;
+    private final Serializer<?> valSerializer;
+    private final Deserializer<?> keyDeserializer;
+    private final Deserializer<?> valDeserializer;
+
+    private boolean initialized;
+
+    public StandbyContextImpl(TaskId id,
+                              StreamingConfig config,
+                              ProcessorStateManager stateMgr,
+                              StreamingMetrics metrics) {
+        this.id = id;
+        this.metrics = metrics;
+        this.stateMgr = stateMgr;
+
+        this.keySerializer = config.keySerializer();
+        this.valSerializer = config.valueSerializer();
+        this.keyDeserializer = config.keyDeserializer();
+        this.valDeserializer = config.valueDeserializer();
+
+        this.initialized = false;
+    }
+
+    public void initialized() {
+        this.initialized = true;
+    }
+
+    public TaskId id() {
+        return id;
+    }
+
+    public ProcessorStateManager getStateMgr() {
+        return stateMgr;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Serializer<?> keySerializer() {
+        return this.keySerializer;
+    }
+
+    @Override
+    public Serializer<?> valueSerializer() {
+        return this.valSerializer;
+    }
+
+    @Override
+    public Deserializer<?> keyDeserializer() {
+        return this.keyDeserializer;
+    }
+
+    @Override
+    public Deserializer<?> valueDeserializer() {
+        return this.valDeserializer;
+    }
+
+    @Override
+    public File stateDir() {
+        return stateMgr.baseDir();
+    }
+
+    @Override
+    public StreamingMetrics metrics() {
+        return metrics;
+    }
+
+    @Override
+    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+        if (initialized)
+            throw new KafkaException("Can only create state stores during initialization.");
+
+        stateMgr.register(store, stateRestoreCallback);
+    }
+
+    @Override
+    public StateStore getStateStore(String name) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String topic() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int partition() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long offset() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long timestamp() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, int childIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void commit() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void schedule(long interval) {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
new file mode 100644
index 0000000..c6442d9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A StandbyTask
+ */
+public class StandbyTask extends AbstractTask {
+
+    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
+
+    private final Map<TopicPartition, Long> checkpointedOffsets;
+
+    /**
+     * Create {@link StandbyTask} with its assigned partitions
+     *
+     * @param id                    the ID of this task
+     * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
+     * @param topology              the instance of {@link ProcessorTopology}
+     * @param config                the {@link StreamingConfig} specified by the user
+     * @param metrics               the {@link StreamingMetrics} created by the thread
+     */
+    public StandbyTask(TaskId id,
+                       Consumer<byte[], byte[]> restoreConsumer,
+                       ProcessorTopology topology,
+                       StreamingConfig config,
+                       StreamingMetrics metrics) {
+        super(id, restoreConsumer, topology, config, null);
+
+        // initialize the topology with its own context
+        this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+
+        initializeStateStores();
+
+        ((StandbyContextImpl) this.processorContext).initialized();
+
+        this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
+    }
+
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        return checkpointedOffsets;
+    }
+
+    public Collection<TopicPartition> changeLogPartitions() {
+        return checkpointedOffsets.keySet();
+    }
+
+    /**
+     * Updates a state store using records from one change log partition
+     */
+    public void update(TopicPartition partition, List<ConsumerRecord<byte[], byte[]>> records) {
+        stateMgr.updateStandbyStates(partition, records);
+    }
+
+    public void commit() {
+        stateMgr.flush();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a9c14e5..5d170f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -21,46 +21,37 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
  */
-public class StreamTask implements Punctuator {
+public class StreamTask extends AbstractTask implements Punctuator {
 
     private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
 
-    private final TaskId id;
     private final int maxBufferedSize;
 
     private final Consumer consumer;
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
     private final PunctuationQueue punctuationQueue;
-    private final ProcessorContextImpl processorContext;
-    private final ProcessorTopology topology;
 
     private final Map<TopicPartition, Long> consumedOffsets;
     private final RecordCollector recordCollector;
-    private final ProcessorStateManager stateMgr;
 
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
@@ -89,12 +80,10 @@ public class StreamTask implements Punctuator {
                       ProcessorTopology topology,
                       StreamingConfig config,
                       StreamingMetrics metrics) {
-
-        this.id = id;
+        super(id, restoreConsumer, topology, config, Collections.unmodifiableSet(new HashSet<>(partitions)));
         this.consumer = consumer;
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
-        this.topology = topology;
 
         // create queues for each assigned partition and associate them
         // to corresponding source nodes in the processor topology
@@ -117,22 +106,11 @@ public class StreamTask implements Punctuator {
 
         log.info("Creating restoration consumer client for stream task #" + id());
 
-        // create the processor state manager
-        try {
-            File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
-            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
-        } catch (IOException e) {
-            throw new KafkaException("Error while creating the state manager", e);
-        }
-
         // initialize the topology with its own context
         this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
 
         // initialize the state stores
-        for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
-            StateStore store = stateStoreSupplier.get();
-            store.init(this.processorContext);
-        }
+        initializeStateStores();
 
         // initialize the task by initializing all its processor nodes in the topology
         for (ProcessorNode node : this.topology.processors()) {
@@ -144,15 +122,7 @@ public class StreamTask implements Punctuator {
             }
         }
 
-        this.processorContext.initialized();
-    }
-
-    public TaskId id() {
-        return id;
-    }
-
-    public Set<TopicPartition> partitions() {
-        return this.partitionGroup.partitions();
+        ((ProcessorContextImpl) this.processorContext).initialized();
     }
 
     /**
@@ -261,10 +231,6 @@ public class StreamTask implements Punctuator {
         return this.currNode;
     }
 
-    public ProcessorTopology topology() {
-        return this.topology;
-    }
-
     /**
      * Commit the current task state
      */
@@ -335,11 +301,7 @@ public class StreamTask implements Punctuator {
         if (exception != null)
             throw exception;
 
-        try {
-            stateMgr.close(recordCollector.offsets());
-        } catch (IOException e) {
-            throw new KafkaException("Error while closing the state manager in processor context", e);
-        }
+        super.close();
     }
 
     private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
@@ -371,8 +333,4 @@ public class StreamTask implements Punctuator {
         }
     }
 
-    public ProcessorContext context() {
-        return processorContext;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 06e5951..bbaeb14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -78,7 +78,8 @@ public class StreamThread extends Thread {
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
     private final AtomicBoolean running;
-    private final Map<TaskId, StreamTask> tasks;
+    private final Map<TaskId, StreamTask> activeTasks;
+    private final Map<TaskId, StandbyTask> standbyTasks;
     private final Set<TaskId> prevTasks;
     private final String clientId;
     private final Time time;
@@ -96,14 +97,16 @@ public class StreamThread extends Thread {
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
-            addPartitions(assignment);
+            addStreamTasks(assignment);
+            addStandbyTasks();
             lastClean = time.milliseconds(); // start the cleaning cycle
         }
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             commitAll();
-            removePartitions();
+            removeStreamTasks();
+            removeStandbyTasks();
             lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
         }
     };
@@ -141,7 +144,8 @@ public class StreamThread extends Thread {
         this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
 
         // initialize the task list
-        this.tasks = new HashMap<>();
+        this.activeTasks = new HashMap<>();
+        this.standbyTasks = new HashMap<>();
         this.prevTasks = new HashSet<>();
 
         // read in task specific config values
@@ -208,7 +212,7 @@ public class StreamThread extends Thread {
     }
 
     public Map<TaskId, StreamTask> tasks() {
-        return Collections.unmodifiableMap(tasks);
+        return Collections.unmodifiableMap(activeTasks);
     }
 
     private void shutdown() {
@@ -236,7 +240,8 @@ public class StreamThread extends Thread {
             log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
         }
         try {
-            removePartitions();
+            removeStreamTasks();
+            removeStandbyTasks();
         } catch (Throwable e) {
             // already logged in removePartition()
         }
@@ -261,7 +266,7 @@ public class StreamThread extends Thread {
                     ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
 
                     if (!records.isEmpty()) {
-                        for (StreamTask task : tasks.values()) {
+                        for (StreamTask task : activeTasks.values()) {
                             for (TopicPartition partition : task.partitions()) {
                                 task.addRecords(partition, records.records(partition));
                             }
@@ -274,11 +279,11 @@ public class StreamThread extends Thread {
 
                 totalNumBuffered = 0;
 
-                if (!tasks.isEmpty()) {
+                if (!activeTasks.isEmpty()) {
                     // try to process one record from each task
                     requiresPoll = false;
 
-                    for (StreamTask task : tasks.values()) {
+                    for (StreamTask task : activeTasks.values()) {
                         long startProcess = time.milliseconds();
 
                         totalNumBuffered += task.process();
@@ -294,6 +299,10 @@ public class StreamThread extends Thread {
                     requiresPoll = true;
                 }
 
+                if (!standbyTasks.isEmpty()) {
+                    updateStandbyTasks();
+                }
+
                 maybeClean();
             }
         } catch (Exception e) {
@@ -301,6 +310,18 @@ public class StreamThread extends Thread {
         }
     }
 
+    private void updateStandbyTasks() {
+        ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
+
+        if (!records.isEmpty()) {
+            for (StandbyTask task : standbyTasks.values()) {
+                for (TopicPartition partition : task.changeLogPartitions()) {
+                    task.update(partition, records.records(partition));
+                }
+            }
+        }
+    }
+
     private boolean stillRunning() {
         if (!running.get()) {
             log.debug("Shutting down at user request.");
@@ -316,7 +337,7 @@ public class StreamThread extends Thread {
     }
 
     private void maybePunctuate() {
-        for (StreamTask task : tasks.values()) {
+        for (StreamTask task : activeTasks.values()) {
             try {
                 long now = time.milliseconds();
 
@@ -324,7 +345,7 @@ public class StreamThread extends Thread {
                     sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
             } catch (Exception e) {
-                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
                 throw e;
             }
         }
@@ -339,12 +360,12 @@ public class StreamThread extends Thread {
             commitAll();
             lastCommit = now;
         } else {
-            for (StreamTask task : tasks.values()) {
+            for (StreamTask task : activeTasks.values()) {
                 try {
                     if (task.commitNeeded())
                         commitOne(task, time.milliseconds());
                 } catch (Exception e) {
-                    log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+                    log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
                     throw e;
                 }
             }
@@ -355,24 +376,22 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        for (StreamTask task : tasks.values()) {
-            try {
-                commitOne(task, time.milliseconds());
-            } catch (Exception e) {
-                log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
+        for (StreamTask task : activeTasks.values()) {
+            commitOne(task, time.milliseconds());
+        }
+        for (StandbyTask task : standbyTasks.values()) {
+            commitOne(task, time.milliseconds());
         }
     }
 
     /**
      * Commit the state of a task
      */
-    private void commitOne(StreamTask task, long now) {
+    private void commitOne(AbstractTask task, long now) {
         try {
             task.commit();
         } catch (Exception e) {
-            log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
             throw e;
         }
 
@@ -426,7 +445,7 @@ public class StreamThread extends Thread {
      * Returns ids of tasks that were being executed before the rebalance.
      */
     public Set<TaskId> prevTasks() {
-        return prevTasks;
+        return Collections.unmodifiableSet(prevTasks);
     }
 
     /**
@@ -467,8 +486,7 @@ public class StreamThread extends Thread {
         return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
     }
 
-    private void addPartitions(Collection<TopicPartition> assignment) {
-
+    private void addStreamTasks(Collection<TopicPartition> assignment) {
         HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
 
         for (TopicPartition partition : assignment) {
@@ -486,9 +504,9 @@ public class StreamThread extends Thread {
         // create the tasks
         for (TaskId taskId : partitionsForTask.keySet()) {
             try {
-                tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
+                activeTasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
             } catch (Exception e) {
-                log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e);
+                log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", e);
                 throw e;
             }
         }
@@ -496,23 +514,68 @@ public class StreamThread extends Thread {
         lastClean = time.milliseconds();
     }
 
-    private void removePartitions() {
-
+    private void removeStreamTasks() {
         // TODO: change this clearing tasks behavior
-        for (StreamTask task : tasks.values()) {
-            log.info("Removing task {}", task.id());
-            try {
-                task.close();
-            } catch (Exception e) {
-                log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
-                throw e;
-            }
-            sensors.taskDestructionSensor.record();
+        for (StreamTask task : activeTasks.values()) {
+            closeOne(task);
         }
+
         prevTasks.clear();
-        prevTasks.addAll(tasks.keySet());
+        prevTasks.addAll(activeTasks.keySet());
+
+        activeTasks.clear();
+    }
+
+    private void closeOne(AbstractTask task) {
+        log.info("Removing a task {}", task.id());
+        try {
+            task.close();
+        } catch (Exception e) {
+            log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
+            throw e;
+        }
+        sensors.taskDestructionSensor.record();
+    }
+
+    protected StandbyTask createStandbyTask(TaskId id) {
+        sensors.taskCreationSensor.record();
+
+        ProcessorTopology topology = builder.build(id.topicGroupId);
+
+        return new StandbyTask(id, restoreConsumer, topology, config, sensors);
+    }
+
+    private void addStandbyTasks() {
+        Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
+
+        for (TaskId taskId : partitionGrouper.standbyTasks()) {
+            StandbyTask task = createStandbyTask(taskId);
+            standbyTasks.put(taskId, task);
+            checkpointedOffsets.putAll(task.checkpointedOffsets());
+        }
+
+        restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
+
+        for (Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) {
+            TopicPartition partition = entry.getKey();
+            long offset = entry.getValue();
+            if (offset >= 0) {
+                restoreConsumer.seek(partition, offset);
+            } else {
+                restoreConsumer.seekToBeginning(partition);
+            }
+        }
+    }
+
+
+    private void removeStandbyTasks() {
+        for (StandbyTask task : standbyTasks.values()) {
+            closeOne(task);
+        }
+        // un-assign the change log partitions
+        restoreConsumer.assign(Collections.<TopicPartition>emptyList());
 
-        tasks.clear();
+        standbyTasks.clear();
     }
 
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
index c0df2e7..aa484fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java
@@ -17,12 +17,10 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -42,7 +40,6 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -110,26 +107,6 @@ public class KafkaStreamingPartitionAssignorTest {
         };
     }
 
-    private static class TestStreamTask extends StreamTask {
-        public boolean committed = false;
-
-        public TestStreamTask(TaskId id,
-                              Consumer<byte[], byte[]> consumer,
-                              Producer<byte[], byte[]> producer,
-                              Consumer<byte[], byte[]> restoreConsumer,
-                              Collection<TopicPartition> partitions,
-                              ProcessorTopology topology,
-                              StreamingConfig config) {
-            super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
-        }
-
-        @Override
-        public void commit() {
-            super.commit();
-            committed = true;
-        }
-    }
-
     private ByteArraySerializer serializer = new ByteArraySerializer();
 
     @SuppressWarnings("unchecked")
@@ -165,9 +142,7 @@ public class KafkaStreamingPartitionAssignorTest {
         };
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(
-                Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
-        );
+        partitionAssignor.configure(config.getConsumerConfigs(thread));
 
         PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
 
@@ -193,6 +168,103 @@ public class KafkaStreamingPartitionAssignorTest {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
+        final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
+        final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
+        final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
+        final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
+
+        UUID uuid1 = UUID.randomUUID();
+        UUID uuid2 = UUID.randomUUID();
+
+        StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
+
+        KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10));
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+        subscriptions.put("consumer11",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+        subscriptions.put("consumer20",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assigned partitions
+
+        assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
+                Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions())));
+        assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
+
+        // check assignment info
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        AssignmentInfo info;
+
+        List<TaskId> activeTasks = new ArrayList<>();
+        for (TopicPartition partition : assignments.get("consumer10").partitions()) {
+            activeTasks.add(new TaskId(0, partition.partition()));
+        }
+        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        assertEquals(0, info.standbyTasks.size());
+
+        allActiveTasks.addAll(info.activeTasks);
+
+        activeTasks.clear();
+        for (TopicPartition partition : assignments.get("consumer11").partitions()) {
+            activeTasks.add(new TaskId(0, partition.partition()));
+        }
+        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        assertEquals(0, info.standbyTasks.size());
+
+        allActiveTasks.addAll(info.activeTasks);
+
+        // check active tasks assigned to the first client
+        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
+
+        activeTasks.clear();
+        for (TopicPartition partition : assignments.get("consumer20").partitions()) {
+            activeTasks.add(new TaskId(0, partition.partition()));
+        }
+        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+        assertEquals(0, info.standbyTasks.size());
+
+        allActiveTasks.addAll(info.activeTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+    }
+
+    @Test
+    public void testAssignWithStandbyReplicas() throws Exception {
+        Properties props = configProps();
+        props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+        StreamingConfig config = new StreamingConfig(props);
+
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer);
+        MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
@@ -207,17 +279,15 @@ public class KafkaStreamingPartitionAssignorTest {
         StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid1, new Metrics(), new SystemTime());
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(
-                Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread10)
-        );
+        partitionAssignor.configure(config.getConsumerConfigs(thread10));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(Utils.mkList("topic1", "topic2"), new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -228,24 +298,54 @@ public class KafkaStreamingPartitionAssignorTest {
         assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
 
         // check assignment info
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        Set<TaskId> allStandbyTasks = new HashSet<>();
+        AssignmentInfo info;
 
         List<TaskId> activeTasks = new ArrayList<>();
         for (TopicPartition partition : assignments.get("consumer10").partitions()) {
             activeTasks.add(new TaskId(0, partition.partition()));
         }
-        assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks);
+        info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+        allActiveTasks.addAll(info.activeTasks);
+        allStandbyTasks.addAll(info.standbyTasks);
 
         activeTasks.clear();
         for (TopicPartition partition : assignments.get("consumer11").partitions()) {
             activeTasks.add(new TaskId(0, partition.partition()));
         }
-        assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks);
+        info = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+        allActiveTasks.addAll(info.activeTasks);
+        allStandbyTasks.addAll(info.standbyTasks);
+
+        // check tasks assigned to the first client
+        assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
 
         activeTasks.clear();
         for (TopicPartition partition : assignments.get("consumer20").partitions()) {
             activeTasks.add(new TaskId(0, partition.partition()));
         }
-        assertEquals(activeTasks, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks);
+        info = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(2, info.activeTasks.size());
+        assertEquals(1, new HashSet<>(info.activeTasks).size());
+
+        allActiveTasks.addAll(info.activeTasks);
+        allStandbyTasks.addAll(info.standbyTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+
+        assertEquals(3, allStandbyTasks.size());
+        assertEquals(allTasks, new HashSet<>(allStandbyTasks));
     }
 
     @Test
@@ -266,9 +366,7 @@ public class KafkaStreamingPartitionAssignorTest {
         StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", uuid, new Metrics(), new SystemTime());
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
-        partitionAssignor.configure(
-                Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
-        );
+        partitionAssignor.configure(config.getConsumerConfigs(thread));
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
         Set<TaskId> standbyTasks = Utils.mkSet(task1, task2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index c447f99..17bc9da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.nio.channels.FileLock;
 import java.nio.file.Files;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -50,7 +49,7 @@ import static org.junit.Assert.assertFalse;
 
 public class ProcessorStateManagerTest {
 
-    private class MockRestoreConsumer  extends MockConsumer<byte[], byte[]> {
+    public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
         public TopicPartition assignedPartition = null;
@@ -79,7 +78,7 @@ public class ProcessorStateManagerTest {
             recordBuffer.clear();
         }
 
-        // buffer a record (we cannot use addRecord because we need to add records before asigning a partition)
+        // buffer a record (we cannot use addRecord because we need to add records before assigning a partition)
         public void bufferRecord(ConsumerRecord<Integer, Integer> record) {
             recordBuffer.add(
                 new ConsumerRecord<>(record.topic(), record.partition(), record.offset(),
@@ -186,7 +185,7 @@ public class ProcessorStateManagerTest {
             FileLock lock;
 
             // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
 
             try {
                 // this should not get the lock
@@ -215,7 +214,7 @@ public class ProcessorStateManagerTest {
         try {
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer(), false);
             try {
                 stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
             } finally {
@@ -235,21 +234,24 @@ public class ProcessorStateManagerTest {
             checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+            restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
                     new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
                     new PartitionInfo("persistentStore", 2, Node.noNode(), new Node[0], new Node[0])
             ));
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+            TopicPartition partition = new TopicPartition("persistentStore", 2);
+            restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
                 ArrayList<Integer> expectedKeys = new ArrayList<>();
+                long offset = -1L;
                 for (int i = 1; i <= 3; i++) {
-                    long offset = (long) i;
+                    offset = (long) i;
                     int key = i * 10;
                     expectedKeys.add(key);
                     restoreConsumer.bufferRecord(
@@ -283,21 +285,24 @@ public class ProcessorStateManagerTest {
             checkpoint.write(Collections.singletonMap(new TopicPartition("persistentStore", 2), lastCheckpointedOffset));
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+            restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
                     new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0]),
                     new PartitionInfo("nonPersistentStore", 2, Node.noNode(), new Node[0], new Node[0])
             ));
-            restoreConsumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("persistentStore", 2), 13L));
+
+            TopicPartition partition = new TopicPartition("persistentStore", 2);
+            restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L));
 
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false); // non persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(2, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
                 ArrayList<Integer> expectedKeys = new ArrayList<>();
+                long offset = -1L;
                 for (int i = 1; i <= 3; i++) {
-                    long offset = (long) (i + 100);
+                    offset = (long) (i + 100);
                     int key = i;
                     expectedKeys.add(i);
                     restoreConsumer.bufferRecord(
@@ -312,6 +317,7 @@ public class ProcessorStateManagerTest {
                 assertTrue(restoreConsumer.seekToBeginingCalled);
                 assertTrue(restoreConsumer.seekToEndCalled);
                 assertEquals(expectedKeys, nonPersistentStore.keys);
+
             } finally {
                 stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
             }
@@ -321,17 +327,68 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
+    public void testChangeLogOffsets() throws IOException {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            long lastCheckpointedOffset = 10L;
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            checkpoint.write(Collections.singletonMap(new TopicPartition("store1", 0), lastCheckpointedOffset));
+
+            MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+            restoreConsumer.updatePartitions("store1", Utils.mkList(
+                    new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0])
+            ));
+            restoreConsumer.updatePartitions("store2", Utils.mkList(
+                    new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0])
+            ));
+
+            TopicPartition partition1 = new TopicPartition("store1", 0);
+            TopicPartition partition2 = new TopicPartition("store2", 0);
+
+            Map<TopicPartition, Long> endOffsets = new HashMap<>();
+            endOffsets.put(partition1, 13L);
+            endOffsets.put(partition2, 17L);
+            restoreConsumer.updateEndOffsets(endOffsets);
+
+            MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore("store1", true);
+            MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore("store2", true);
+
+            ProcessorStateManager stateMgr = new ProcessorStateManager(0, baseDir, restoreConsumer, true); // standby
+            try {
+                restoreConsumer.reset();
+
+                stateMgr.register(store1, store1.stateRestoreCallback);
+                stateMgr.register(store2, store2.stateRestoreCallback);
+
+                Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointedOffsets();
+
+                assertEquals(2, changeLogOffsets.size());
+                assertTrue(changeLogOffsets.containsKey(partition1));
+                assertTrue(changeLogOffsets.containsKey(partition2));
+                assertEquals(lastCheckpointedOffset, (long) changeLogOffsets.get(partition1));
+                assertEquals(-1L, (long) changeLogOffsets.get(partition2));
+
+            } finally {
+                stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            }
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
     public void testGetStore() throws IOException {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("mockStore", Arrays.asList(
+            restoreConsumer.updatePartitions("mockStore", Utils.mkList(
                     new PartitionInfo("mockStore", 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("mockStore", false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
             try {
                 stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
 
@@ -356,10 +413,10 @@ public class ProcessorStateManagerTest {
             oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
             MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-            restoreConsumer.updatePartitions("persistentStore", Arrays.asList(
+            restoreConsumer.updatePartitions("persistentStore", Utils.mkList(
                     new PartitionInfo("persistentStore", 1, Node.noNode(), new Node[0], new Node[0])
             ));
-            restoreConsumer.updatePartitions("nonPersistentStore", Arrays.asList(
+            restoreConsumer.updatePartitions("nonPersistentStore", Utils.mkList(
                     new PartitionInfo("nonPersistentStore", 1, Node.noNode(), new Node[0], new Node[0])
             ));
 
@@ -372,7 +429,7 @@ public class ProcessorStateManagerTest {
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true);
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore("nonPersistentStore", false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer, false);
             try {
                 // make sure the checkpoint file is deleted
                 assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
new file mode 100644
index 0000000..b8a6990
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class StandbyTaskTest {
+
+    private final TaskId taskId = new TaskId(0, 1);
+
+    private final Serializer<Integer> intSerializer = new IntegerSerializer();
+
+    private final TopicPartition partition1 = new TopicPartition("store1", 1);
+    private final TopicPartition partition2 = new TopicPartition("store2", 1);
+
+    private final ProcessorTopology topology = new ProcessorTopology(
+            Collections.<ProcessorNode>emptyList(),
+            Collections.<String, SourceNode>emptyMap(),
+            Utils.<StateStoreSupplier>mkList(
+                    new MockStateStoreSupplier(partition1.topic(), false),
+                    new MockStateStoreSupplier(partition2.topic(), true)
+            )
+    );
+
+
+    private StreamingConfig createConfig(final File baseDir) throws Exception {
+        return new StreamingConfig(new Properties() {
+            {
+                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            }
+        });
+    }
+
+    private final ProcessorStateManagerTest.MockRestoreConsumer restoreStateConsumer = new ProcessorStateManagerTest.MockRestoreConsumer();
+
+    private final byte[] recordValue = intSerializer.serialize(null, 10);
+    private final byte[] recordKey = intSerializer.serialize(null, 1);
+
+    @Before
+    public void setup() {
+        restoreStateConsumer.reset();
+        restoreStateConsumer.updatePartitions("store1", Utils.mkList(
+                new PartitionInfo("store1", 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("store1", 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("store1", 2, Node.noNode(), new Node[0], new Node[0])
+        ));
+
+        restoreStateConsumer.updatePartitions("store2", Utils.mkList(
+                new PartitionInfo("store2", 0, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("store2", 1, Node.noNode(), new Node[0], new Node[0]),
+                new PartitionInfo("store2", 2, Node.noNode(), new Node[0], new Node[0])
+        ));
+    }
+
+    @Test
+    public void testStorePartitions() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamingConfig config = createConfig(baseDir);
+            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+            assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = Exception.class)
+    public void testUpdateNonPersistentStore() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamingConfig config = createConfig(baseDir);
+            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+            task.update(partition1,
+                    records(new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue))
+            );
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testUpdate() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamingConfig config = createConfig(baseDir);
+            StandbyTask task = new StandbyTask(taskId, restoreStateConsumer, topology, config, null);
+
+            restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
+
+            for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 10, 1, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 20, 2, 100),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 30, 3, 100))) {
+                restoreStateConsumer.bufferRecord(record);
+            }
+
+            for (Map.Entry<TopicPartition, Long> entry : task.checkpointedOffsets().entrySet()) {
+                TopicPartition partition = entry.getKey();
+                long offset = entry.getValue();
+                if (offset >= 0) {
+                    restoreStateConsumer.seek(partition, offset);
+                } else {
+                    restoreStateConsumer.seekToBeginning(partition);
+                }
+            }
+
+            task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
+
+            StandbyContextImpl context = (StandbyContextImpl) task.context();
+            MockStateStoreSupplier.MockStateStore store1 =
+                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition1.topic());
+            MockStateStoreSupplier.MockStateStore store2 =
+                    (MockStateStoreSupplier.MockStateStore) context.getStateMgr().getStore(partition2.topic());
+
+            assertEquals(Collections.emptyList(), store1.keys);
+            assertEquals(Utils.mkList(1, 2, 3), store2.keys);
+
+            task.close();
+
+            File taskDir = new File(baseDir, taskId.toString());
+            OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
+            Map<TopicPartition, Long> offsets = checkpoint.read();
+
+            assertEquals(1, offsets.size());
+            assertEquals(new Long(30L + 1L), offsets.get(partition2));
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
+        return Arrays.asList(recs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45e7f713/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 54d0a18..02d0ac7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -167,7 +167,7 @@ public class StreamThreadTest {
             }
         };
 
-        initPartitionGrouper(thread);
+        initPartitionGrouper(config, thread);
 
         ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -292,7 +292,7 @@ public class StreamThreadTest {
                 }
             };
 
-            initPartitionGrouper(thread);
+            initPartitionGrouper(config, thread);
 
             ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -414,7 +414,7 @@ public class StreamThreadTest {
                 }
             };
 
-            initPartitionGrouper(thread);
+            initPartitionGrouper(config, thread);
 
             ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
 
@@ -467,13 +467,11 @@ public class StreamThreadTest {
         }
     }
 
-    private void initPartitionGrouper(StreamThread thread) {
+    private void initPartitionGrouper(StreamingConfig config, StreamThread thread) {
 
         KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor();
 
-        partitionAssignor.configure(
-                Collections.singletonMap(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread)
-        );
+        partitionAssignor.configure(config.getConsumerConfigs(thread));
 
         Map<String, PartitionAssignor.Assignment> assignments =
                 partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));