You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/12/08 00:12:24 UTC

[2/2] kafka git commit: KAFKA-2804: manage changelog topics through ZK in PartitionAssignor

KAFKA-2804: manage changelog topics through ZK in PartitionAssignor

Author: Guozhang Wang <wa...@gmail.com>
Author: wangguoz@gmail.com <gu...@Guozhang-Macbook.local>
Author: Guozhang Wang <gu...@Guozhang-Macbook.local>

Reviewers: Yasuhiro Matsuda

Closes #579 from guozhangwang/K2804


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d05fa0a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d05fa0a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d05fa0a0

Branch: refs/heads/trunk
Commit: d05fa0a03bc9bcfcff8d73cbf1b22832ebdb75a2
Parents: 23f36c5
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Dec 7 15:12:09 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Dec 7 15:12:09 2015 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   2 +-
 build.gradle                                    |   2 +
 checkstyle/import-control.xml                   |   8 +
 .../apache/kafka/streams/KafkaStreaming.java    |  40 +--
 .../apache/kafka/streams/StreamingConfig.java   |  48 +++-
 .../kafka/streams/examples/KStreamJob.java      |   2 +-
 .../kafka/streams/examples/ProcessorJob.java    |   8 +-
 .../streams/kstream/SlidingWindowSupplier.java  |   2 +-
 .../streams/kstream/internals/KTableImpl.java   |   3 +-
 .../processor/DefaultPartitionGrouper.java      |   7 +-
 .../streams/processor/PartitionGrouper.java     |  31 +--
 .../streams/processor/ProcessorContext.java     |   2 +-
 .../streams/processor/TopologyBuilder.java      | 139 +++++++---
 .../processor/internals/AbstractTask.java       |   3 +-
 .../KafkaStreamingPartitionAssignor.java        | 270 +++++++++++++++++--
 .../internals/ProcessorContextImpl.java         |   4 +-
 .../internals/ProcessorStateManager.java        |  63 +++--
 .../processor/internals/StandbyContextImpl.java |   4 +-
 .../processor/internals/StandbyTask.java        |   8 +-
 .../streams/processor/internals/StreamTask.java |   8 +-
 .../processor/internals/StreamThread.java       |  43 ++-
 .../internals/assignment/SubscriptionInfo.java  |  32 +--
 .../streams/state/MeteredKeyValueStore.java     |   2 +-
 .../kafka/streams/state/RocksDBStore.java       |   1 -
 .../kafka/streams/StreamingConfigTest.java      |  21 +-
 .../processor/DefaultPartitionGrouperTest.java  |  32 ++-
 .../streams/processor/TopologyBuilderTest.java  |  31 ++-
 .../KafkaStreamingPartitionAssignorTest.java    | 201 +++++++++++---
 .../internals/ProcessorStateManagerTest.java    | 152 ++++++-----
 .../processor/internals/StandbyTaskTest.java    |  46 ++--
 .../processor/internals/StreamTaskTest.java     |   4 +-
 .../processor/internals/StreamThreadTest.java   |  15 +-
 .../assignment/SubscriptionInfoTest.java        |   5 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   2 +-
 .../apache/kafka/test/MockProcessorContext.java |   2 +-
 .../kafka/test/MockStateStoreSupplier.java      |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   8 +-
 .../apache/kafka/test/UnlimitedWindowDef.java   |   2 +-
 38 files changed, 899 insertions(+), 356 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fcf442b..2551338 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -57,7 +57,7 @@ do
   CLASSPATH=$CLASSPATH:$file
 done
 
-for file in $base_dir/stream/build/libs/kafka-streams*.jar;
+for file in $base_dir/streams/build/libs/kafka-streams*.jar;
 do
   CLASSPATH=$CLASSPATH:$file
 done

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e24279e..224f550 100644
--- a/build.gradle
+++ b/build.gradle
@@ -561,6 +561,8 @@ project(':streams') {
         compile project(':clients')
         compile "$slf4jlog4j"
         compile 'org.rocksdb:rocksdbjni:3.10.1'
+        compile 'com.101tec:zkclient:0.7' // this dependency should be removed after KIP-4
+        compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"  // this dependency should be removed after KIP-4
 
         testCompile "$junit"
         testCompile project(path: ':clients', configuration: 'archives')

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e221dce..a65a2dc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -129,6 +129,14 @@
     <subpackage name="state">
       <allow pkg="org.rocksdb" />
     </subpackage>
+
+    <subpackage name="processor">
+      <subpackage name="internals">
+        <allow pkg="org.I0Itec.zkclient" />
+        <allow pkg="com.fasterxml.jackson" />
+        <allow pkg="org.apache.zookeeper" />
+      </subpackage>
+    </subpackage>
   </subpackage>
 
   <subpackage name="log4jappender">

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
index fc1fdae..0d99739 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -73,9 +73,7 @@ public class KafkaStreaming {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
     private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
-    private static final String JMX_PREFIX = "kafka.streaming";
-
-    private final Time time;
+    private static final String JMX_PREFIX = "kafka.streams";
 
     // container states
     private static final int CREATED = 0;
@@ -85,29 +83,39 @@ public class KafkaStreaming {
 
     private final StreamThread[] threads;
 
-    private String clientId;
-    private final UUID uuid;
-    private final Metrics metrics;
+    // processId is expected to be unique across JVMs and to be used
+    // in userData of the subscription request to allow assignor be aware
+    // of the co-location of stream thread's consumers. It is for internal
+    // usage only and should not be exposed to users at all.
+    private final UUID processId;
 
     public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
         // create the metrics
-        this.time = new SystemTime();
-        this.uuid = UUID.randomUUID();
+        Time time = new SystemTime();
 
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
-            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
-        clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
+        this.processId = UUID.randomUUID();
+
+        String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
+        if (jobId.length() <= 0)
+            jobId = "kafka-streams";
+
+        String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
         if (clientId.length() <= 0)
-            clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+            clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+
         List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
-            MetricsReporter.class);
+                MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
-        this.metrics = new Metrics(metricConfig, reporters, time);
+
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                TimeUnit.MILLISECONDS);
+
+        Metrics metrics = new Metrics(metricConfig, reporters, time);
 
         this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config, this.clientId, this.uuid, this.metrics, this.time);
+            this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index 437afd8..e89d030 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -42,6 +42,10 @@ public class StreamingConfig extends AbstractConfig {
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
 
+    /** <code>zookeeper.connect<code/> */
+    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
     /** <code>commit.interval.ms</code> */
     public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
     private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
@@ -83,8 +87,9 @@ public class StreamingConfig extends AbstractConfig {
     public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
 
-    /** <code>client.id</code> */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+    /** <code>job.id</code> */
+    public static final String JOB_ID_CONFIG = "job.id";
+    public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
     /** <code>key.serializer</code> */
     public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -107,19 +112,30 @@ public class StreamingConfig extends AbstractConfig {
     /** <code>metric.reporters</code> */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
-    /**
-     * <code>bootstrap.servers</code>
-     */
+    /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+    /** <code>client.id</code> */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
     private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
 
     static {
-        CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
+        CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.MEDIUM,
+                                        StreamingConfig.JOB_ID_DOC)
+                                .define(CLIENT_ID_CONFIG,
                                         Type.STRING,
                                         "",
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(ZOOKEEPER_CONNECT_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.HIGH,
+                                        StreamingConfig.ZOOKEEPER_CONNECT_DOC)
                                 .define(STATE_DIR_CONFIG,
                                         Type.STRING,
                                         SYSTEM_TEMP_DIRECTORY,
@@ -221,20 +237,27 @@ public class StreamingConfig extends AbstractConfig {
         super(CONFIG, props);
     }
 
-    public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
+    public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
         Map<String, Object> props = getBaseConsumerConfigs();
+
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
         props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
+
+        props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
         return props;
     }
 
-    public Map<String, Object> getRestoreConsumerConfigs() {
+    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
         Map<String, Object> props = getBaseConsumerConfigs();
 
-        // no group id for a restore consumer
+        // no need to set group id for a restore consumer
         props.remove(ConsumerConfig.GROUP_ID_CONFIG);
 
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
         return props;
     }
 
@@ -248,11 +271,12 @@ public class StreamingConfig extends AbstractConfig {
         props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
         props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
         props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+        props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
         return props;
     }
 
-    public Map<String, Object> getProducerConfigs() {
+    public Map<String, Object> getProducerConfigs(String clientId) {
         Map<String, Object> props = this.originals();
 
         // set producer default property values
@@ -263,6 +287,8 @@ public class StreamingConfig extends AbstractConfig {
         props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
 
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
         return props;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 87368c1..819bd68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -34,7 +34,7 @@ public class KStreamJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
+        props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
         props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 882c7ed..2d0b79f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -49,7 +49,7 @@ public class ProcessorJob {
                 public void init(ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(1000);
-                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
+                    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE");
                 }
 
                 @Override
@@ -90,8 +90,9 @@ public class ProcessorJob {
 
     public static void main(String[] args) throws Exception {
         Properties props = new Properties();
-        props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
+        props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
         props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
         props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@@ -104,8 +105,7 @@ public class ProcessorJob {
         builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
 
         builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
-        builder.addStateStore(Stores.create("local-state").withStringKeys().withIntegerValues().inMemory().build());
-        builder.connectProcessorAndStateStores("local-state", "PROCESS");
+        builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS");
 
         builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
index 0cf969f..80e548f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
@@ -85,7 +85,7 @@ public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
             this.context = context;
             this.partition = context.id().partition;
             SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
-            context.register(this, restoreFunc);
+            context.register(this, true, restoreFunc);
 
             for (ValueList<V> valueList : map.values()) {
                 valueList.clearDirtyValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 5b2b031..47c9b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -177,7 +177,8 @@ public class KTableImpl<K, S, V> implements KTable<K, V> {
                 if (!source.isMaterialized()) {
                     StateStoreSupplier storeSupplier =
                             new KTableStoreSupplier(topic, keySerializer, keyDeserializer, valSerializer, valDeserializer, null);
-                    topology.addStateStore(storeSupplier, name);
+                    // mark this state is non internal hence it is read directly from a user topic
+                    topology.addStateStore(storeSupplier, false, name);
                     source.materialize();
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 7d2188a..923a217 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -29,9 +29,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class DefaultPartitionGrouper extends PartitionGrouper {
+public class DefaultPartitionGrouper implements PartitionGrouper {
 
-    public Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata) {
+    public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
         Map<TaskId, Set<TopicPartition>> groups = new HashMap<>();
 
         for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
@@ -71,3 +71,6 @@ public class DefaultPartitionGrouper extends PartitionGrouper {
     }
 
 }
+
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 187c4ce..a40a1c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -19,39 +19,18 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
 
 import java.util.Map;
 import java.util.Set;
 
-public abstract class PartitionGrouper {
-
-    protected Map<Integer, Set<String>> topicGroups;
-
-    private KafkaStreamingPartitionAssignor partitionAssignor = null;
+public interface PartitionGrouper {
 
     /**
      * Returns a map of task ids to groups of partitions.
      *
-     * @param metadata
+     * @param topicGroups The subscribed topic groups
+     * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */
-    public abstract Map<TaskId, Set<TopicPartition>> partitionGroups(Cluster metadata);
-
-    public void topicGroups(Map<Integer, Set<String>> topicGroups) {
-        this.topicGroups = topicGroups;
-    }
-
-    public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
-        this.partitionAssignor = partitionAssignor;
-    }
-
-    public Set<TaskId> taskIds(TopicPartition partition) {
-        return partitionAssignor.taskIds(partition);
-    }
-
-    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
-        return partitionAssignor.standbyTasks();
-    }
-
-}
+    Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 88ac64e..fa19ed7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -79,7 +79,7 @@ public interface ProcessorContext {
      *
      * @param store the storage engine
      */
-    void register(StateStore store, StateRestoreCallback stateRestoreCallback);
+    void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
 
     StateStore getStateStore(String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 021a47f..3cfb22b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -45,14 +45,17 @@ import java.util.Set;
  * its child nodes. A {@link Processor processor} is a node in the graph that receives input messages from upstream nodes,
  * processes that message, and optionally forwarding new messages to one or all of its children. Finally, a {@link SinkNode sink}
  * is a node in the graph that receives messages from upstream nodes and writes them to a Kafka topic. This builder allows you
- * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link KafkaStreaming} instance
- * that will then {@link KafkaStreaming#start() begin consuming, processing, and producing messages}.
+ * to construct an acyclic graph of these nodes, and the builder is then passed into a new {@link org.apache.kafka.streams.KafkaStreaming}
+ * instance that will then {@link org.apache.kafka.streams.KafkaStreaming#start() begin consuming, processing, and producing messages}.
  */
 public class TopologyBuilder {
 
     // node factories in a topological order
     private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap<>();
 
+    // state factories
+    private final Map<String, StateStoreFactory> stateFactories = new HashMap<>();
+
     private final Set<String> sourceTopicNames = new HashSet<>();
 
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
@@ -60,8 +63,18 @@ public class TopologyBuilder {
     private final HashMap<String, String[]> nodeToTopics = new HashMap<>();
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    private Map<String, StateStoreSupplier> stateStores = new HashMap<>();
-    private Map<String, Set<String>> stateStoreUsers = new HashMap();
+    private static class StateStoreFactory {
+        public final Set<String> users;
+
+        public final boolean isInternal;
+        public final StateStoreSupplier supplier;
+
+        StateStoreFactory(boolean isInternal, StateStoreSupplier supplier) {
+            this.isInternal = isInternal;
+            this.supplier = supplier;
+            this.users = new HashSet<>();
+        }
+    }
 
     private static abstract class NodeFactory {
         public final String name;
@@ -88,6 +101,7 @@ public class TopologyBuilder {
             stateStoreNames.add(stateStoreName);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
             return new ProcessorNode(name, supplier.get(), stateStoreNames);
@@ -106,6 +120,7 @@ public class TopologyBuilder {
             this.valDeserializer = valDeserializer;
         }
 
+        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
             return new SourceNode(name, keyDeserializer, valDeserializer);
@@ -125,12 +140,40 @@ public class TopologyBuilder {
             this.keySerializer = keySerializer;
             this.valSerializer = valSerializer;
         }
+
+        @SuppressWarnings("unchecked")
         @Override
         public ProcessorNode build() {
             return new SinkNode(name, topic, keySerializer, valSerializer);
         }
     }
 
+    public static class TopicsInfo {
+        public Set<String> sourceTopics;
+        public Set<String> stateNames;
+
+        public TopicsInfo(Set<String> sourceTopics, Set<String> stateNames) {
+            this.sourceTopics = sourceTopics;
+            this.stateNames = stateNames;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof TopicsInfo) {
+                TopicsInfo other = (TopicsInfo) o;
+                return other.sourceTopics.equals(this.sourceTopics) && other.stateNames.equals(this.stateNames);
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            long n = ((long) sourceTopics.hashCode() << 32) | (long) stateNames.hashCode();
+            return (int) (n % 0xFFFFFFFFL);
+        }
+    }
+
     /**
      * Create a new builder.
      */
@@ -138,9 +181,9 @@ public class TopologyBuilder {
 
     /**
      * Add a new source that consumes the named topics and forwards the messages to child processor and/or sink nodes.
-     * The source will use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
-     * {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
+     * The source will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} and
+     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
      *
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
@@ -158,11 +201,11 @@ public class TopologyBuilder {
      * @param name the unique name of the source used to reference this node when
      * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
      * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming messages; may be null if the source
-     * should use the {@link StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_DESERIALIZER_CLASS_CONFIG default key deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
      * @param valDeserializer the {@link Deserializer value deserializer} used when consuming messages; may be null if the source
-     * should use the {@link StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_DESERIALIZER_CLASS_CONFIG default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
      * @param topics the name of one or more Kafka topics that this source is to consume
      * @return this builder instance so methods can be chained together; never null
      */
@@ -186,9 +229,9 @@ public class TopologyBuilder {
 
     /**
      * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic.
-     * The sink will use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
-     * {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}.
+     * The sink will use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} and
+     * {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}.
      *
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
@@ -205,11 +248,11 @@ public class TopologyBuilder {
      * @param name the unique name of the sink
      * @param topic the name of the Kafka topic to which this sink should write its messages
      * @param keySerializer the {@link Serializer key serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamingConfig#KEY_SERIALIZER_CLASS_CONFIG default key serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
      * @param valSerializer the {@link Serializer value serializer} used when consuming messages; may be null if the sink
-     * should use the {@link StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
-     * {@link StreamingConfig streaming configuration}
+     * should use the {@link org.apache.kafka.streams.StreamingConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the
+     * {@link org.apache.kafka.streams.StreamingConfig streaming configuration}
      * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume
      * and write to its topic
      * @return this builder instance so methods can be chained together; never null
@@ -271,12 +314,12 @@ public class TopologyBuilder {
      * @param supplier the supplier used to obtain this state store {@link StateStore} instance
      * @return this builder instance so methods can be chained together; never null
      */
-    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
-        if (stateStores.containsKey(supplier.name())) {
+    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, boolean isInternal, String... processorNames) {
+        if (stateFactories.containsKey(supplier.name())) {
             throw new TopologyException("StateStore " + supplier.name() + " is already added.");
         }
-        stateStores.put(supplier.name(), supplier);
-        stateStoreUsers.put(supplier.name(), new HashSet<String>());
+
+        stateFactories.put(supplier.name(), new StateStoreFactory(isInternal, supplier));
 
         if (processorNames != null) {
             for (String processorName : processorNames) {
@@ -288,6 +331,16 @@ public class TopologyBuilder {
     }
 
     /**
+     * Adds a state store
+     *
+     * @param supplier the supplier used to obtain this state store {@link StateStore} instance
+     * @return this builder instance so methods can be chained together; never null
+     */
+    public final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) {
+        return this.addStateStore(supplier, true, processorNames);
+    }
+
+    /**
      * Connects the processor and the state stores
      *
      * @param processorName the name of the processor
@@ -305,22 +358,22 @@ public class TopologyBuilder {
     }
 
     private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
-        if (!stateStores.containsKey(stateStoreName))
+        if (!stateFactories.containsKey(stateStoreName))
             throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
         if (!nodeFactories.containsKey(processorName))
             throw new TopologyException("Processor " + processorName + " is not added yet.");
 
-        Set<String> users = stateStoreUsers.get(stateStoreName);
-        Iterator<String> iter = users.iterator();
+        StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName);
+        Iterator<String> iter = stateStoreFactory.users.iterator();
         if (iter.hasNext()) {
             String user = iter.next();
             nodeGrouper.unite(user, processorName);
         }
-        users.add(processorName);
+        stateStoreFactory.users.add(processorName);
 
-        NodeFactory factory = nodeFactories.get(processorName);
-        if (factory instanceof ProcessorNodeFactory) {
-            ((ProcessorNodeFactory) factory).addStateStore(stateStoreName);
+        NodeFactory nodeFactory = nodeFactories.get(processorName);
+        if (nodeFactory instanceof ProcessorNodeFactory) {
+            ((ProcessorNodeFactory) nodeFactory).addStateStore(stateStoreName);
         } else {
             throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
         }
@@ -332,20 +385,32 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, Set<String>> topicGroups() {
-        Map<Integer, Set<String>> topicGroups = new HashMap<>();
+    public Map<Integer, TopicsInfo> topicGroups() {
+        Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
 
         if (nodeGroups == null)
             nodeGroups = makeNodeGroups();
 
         for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) {
-            Set<String> topicGroup = new HashSet<>();
+            Set<String> sourceTopics = new HashSet<>();
+            Set<String> stateNames = new HashSet<>();
             for (String node : entry.getValue()) {
+                // if the node is a source node, add to the source topics
                 String[] topics = nodeToTopics.get(node);
                 if (topics != null)
-                    topicGroup.addAll(Arrays.asList(topics));
+                    sourceTopics.addAll(Arrays.asList(topics));
+
+                // if the node is connected to a state, add to the state topics
+                for (StateStoreFactory stateFactory : stateFactories.values()) {
+
+                    if (stateFactory.isInternal && stateFactory.users.contains(node)) {
+                        stateNames.add(stateFactory.supplier.name());
+                    }
+                }
             }
-            topicGroups.put(entry.getKey(), Collections.unmodifiableSet(topicGroup));
+            topicGroups.put(entry.getKey(), new TopicsInfo(
+                    Collections.unmodifiableSet(sourceTopics),
+                    Collections.unmodifiableSet(stateNames)));
         }
 
         return Collections.unmodifiableMap(topicGroups);
@@ -431,9 +496,9 @@ public class TopologyBuilder {
 
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
-     * {@link KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)} constructor.
+     * {@link org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)} constructor.
      *
-     * @see KafkaStreaming#KafkaStreaming(TopologyBuilder, StreamingConfig)
+     * @see org.apache.kafka.streams.KafkaStreaming#KafkaStreaming(TopologyBuilder, org.apache.kafka.streams.StreamingConfig)
      */
     public ProcessorTopology build(Integer topicGroupId) {
         Set<String> nodeGroup;
@@ -467,7 +532,7 @@ public class TopologyBuilder {
                         }
                         for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
                             if (!stateStoreMap.containsKey(stateStoreName)) {
-                                stateStoreMap.put(stateStoreName, stateStores.get(stateStoreName));
+                                stateStoreMap.put(stateStoreName, stateFactories.get(stateStoreName).supplier);
                             }
                         }
                     } else if (factory instanceof SourceNodeFactory) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index e1b4d62..b3255bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -43,6 +43,7 @@ public abstract class AbstractTask {
     protected ProcessorContext processorContext;
 
     protected AbstractTask(TaskId id,
+                           String jobId,
                            Collection<TopicPartition> partitions,
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
@@ -58,7 +59,7 @@ public abstract class AbstractTask {
         try {
             File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer, isStandby);
+            this.stateMgr = new ProcessorStateManager(jobId, id.partition, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new KafkaException("Error while creating the state manager", e);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
index 54d5567..29c67f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignor.java
@@ -21,9 +21,11 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -32,7 +34,21 @@ import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.zookeeper.ZooDefs;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.I0Itec.zkclient.ZkClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,10 +62,146 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
     private static final Logger log = LoggerFactory.getLogger(KafkaStreamingPartitionAssignor.class);
 
     private StreamThread streamThread;
+
     private int numStandbyReplicas;
+    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
+    private Map<String, Set<TaskId>> stateNameToTaskIds;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
 
+
+    // TODO: the following ZK dependency should be removed after KIP-4
+    private static final String ZK_TOPIC_PATH = "/brokers/topics";
+    private static final String ZK_BROKER_PATH = "/brokers/ids";
+    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
+
+    private ZkClient zkClient;
+
+    private class ZKStringSerializer implements ZkSerializer {
+
+        @Override
+        public byte[] serialize(Object data) {
+            try {
+                return ((String) data).getBytes("UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+
+        @Override
+        public Object deserialize(byte[] bytes) {
+            try {
+                if (bytes == null)
+                    return null;
+                else
+                    return new String(bytes, "UTF-8");
+            } catch (UnsupportedEncodingException e) {
+                throw new AssertionError(e);
+            }
+        }
+    }
+
+    private List<Integer> getBrokers() {
+        List<Integer> brokers = new ArrayList<>();
+        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
+            brokers.add(Integer.parseInt(broker));
+        }
+        Collections.sort(brokers);
+
+        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
+
+        return brokers;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
+        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
+
+        if (data == null) return null;
+
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+
+            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
+
+            });
+
+            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
+
+            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
+
+            return partitions;
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void createTopic(String topic, int numPartitions) throws ZkNodeExistsException {
+        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions);
+
+        // we always assign leaders to brokers starting at the first one with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        Map<Integer, List<Integer>> assignment = new HashMap<>();
+        for (int i = 0; i < numPartitions; i++) {
+            assignment.put(i, Collections.singletonList(brokers.get(i % brokers.size())));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", assignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic, data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    private void deleteTopic(String topic) throws ZkNodeExistsException {
+        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
+
+        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+    }
+
+    private void addPartitions(String topic, int numPartitions, Map<Integer, List<Integer>> existingAssignment) {
+        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
+
+        // we always assign new leaders to brokers starting at the last broker of the existing assignment with replication factor 1
+        List<Integer> brokers = getBrokers();
+
+        int startIndex = existingAssignment.size();
+
+        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
+
+        for (int i = 0; i < numPartitions; i++) {
+            newAssignment.put(i + startIndex, Collections.singletonList(brokers.get(i + startIndex) % brokers.size()));
+        }
+
+        // try to write to ZK with open ACL
+        try {
+            Map<String, Object> dataMap = new HashMap<>();
+            dataMap.put("version", 1);
+            dataMap.put("partitions", newAssignment);
+
+            ObjectMapper mapper = new ObjectMapper();
+            String data = mapper.writeValueAsString(dataMap);
+
+            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
+        } catch (JsonProcessingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
+     * since the former needs later's cached metadata while sending subscriptions,
+     * and the latter needs former's returned assignment when adding tasks.
+     */
     @Override
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
@@ -68,7 +220,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         }
 
         streamThread = (StreamThread) o;
-        streamThread.partitionGrouper.partitionAssignor(this);
+        streamThread.partitionAssignor(this);
+
+        this.topicGroups = streamThread.builder.topicGroups();
+
+        if (configs.containsKey(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG))
+            zkClient = new ZkClient((String) configs.get(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG), 30 * 1000, 30 * 1000, new ZKStringSerializer());
     }
 
     @Override
@@ -86,7 +243,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.clientUUID, prevTasks, standbyTasks);
+        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks);
 
         return new Subscription(new ArrayList<>(topics), data.encode());
     }
@@ -112,17 +269,17 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
 
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
 
-            Set<String> consumers = consumersByClient.get(info.clientUUID);
+            Set<String> consumers = consumersByClient.get(info.processId);
             if (consumers == null) {
                 consumers = new HashSet<>();
-                consumersByClient.put(info.clientUUID, consumers);
+                consumersByClient.put(info.processId, consumers);
             }
             consumers.add(consumerId);
 
-            ClientState<TaskId> state = states.get(info.clientUUID);
+            ClientState<TaskId> state = states.get(info.processId);
             if (state == null) {
                 state = new ClientState<>();
-                states.put(info.clientUUID, state);
+                states.put(info.processId, state);
             }
 
             state.prevActiveTasks.addAll(info.prevTasks);
@@ -131,21 +288,40 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
             state.capacity = state.capacity + 1d;
         }
 
-        // Get partition groups from the partition grouper
-        Map<TaskId, Set<TopicPartition>> partitionGroups = streamThread.partitionGrouper.partitionGroups(metadata);
+        // get the tasks as partition groups from the partition grouper
+        Map<Integer, Set<String>> sourceTopicGroups = new HashMap<>();
+        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
+        }
+        Map<TaskId, Set<TopicPartition>> partitionsForTask = streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadata);
+
+        // add tasks to state topic subscribers
+        stateNameToTaskIds = new HashMap<>();
+        for (TaskId task : partitionsForTask.keySet()) {
+            for (String stateName : topicGroups.get(task.topicGroupId).stateNames) {
+                Set<TaskId> tasks = stateNameToTaskIds.get(stateName);
+                if (tasks == null) {
+                    tasks = new HashSet<>();
+                    stateNameToTaskIds.put(stateName, tasks);
+                }
+
+                tasks.add(task);
+            }
+        }
 
-        states = TaskAssignor.assign(states, partitionGroups.keySet(), numStandbyReplicas);
+        // assign tasks to clients
+        states = TaskAssignor.assign(states, partitionsForTask.keySet(), numStandbyReplicas);
         Map<String, Assignment> assignment = new HashMap<>();
 
         for (Map.Entry<UUID, Set<String>> entry : consumersByClient.entrySet()) {
-            UUID uuid = entry.getKey();
+            UUID processId = entry.getKey();
             Set<String> consumers = entry.getValue();
-            ClientState<TaskId> state = states.get(uuid);
+            ClientState<TaskId> state = states.get(processId);
 
             ArrayList<TaskId> taskIds = new ArrayList<>(state.assignedTasks.size());
             final int numActiveTasks = state.activeTasks.size();
-            for (TaskId id : state.activeTasks) {
-                taskIds.add(id);
+            for (TaskId taskId : state.activeTasks) {
+                taskIds.add(taskId);
             }
             for (TaskId id : state.assignedTasks) {
                 if (!state.activeTasks.contains(id))
@@ -164,7 +340,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
                 for (int j = i; j < numTaskIds; j += numConsumers) {
                     TaskId taskId = taskIds.get(j);
                     if (j < numActiveTasks) {
-                        for (TopicPartition partition : partitionGroups.get(taskId)) {
+                        for (TopicPartition partition : partitionsForTask.get(taskId)) {
                             activePartitions.add(partition);
                             active.add(taskId);
                         }
@@ -174,7 +350,7 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
                             standbyPartitions = new HashSet<>();
                             standby.put(taskId, standbyPartitions);
                         }
-                        standbyPartitions.addAll(partitionGroups.get(taskId));
+                        standbyPartitions.addAll(partitionsForTask.get(taskId));
                     }
                 }
 
@@ -187,6 +363,63 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
             }
         }
 
+        // if ZK is specified, get the tasks for each state topic and validate the topic partitions
+        if (zkClient != null) {
+            log.debug("Starting to validate changelog topics in partition assignor.");
+
+            for (Map.Entry<String, Set<TaskId>> entry : stateNameToTaskIds.entrySet()) {
+                String topic = streamThread.jobId + "-" + entry.getKey() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX;
+
+                // the expected number of partitions is the max value of TaskId.partition + 1
+                int numPartitions = 0;
+                for (TaskId task : entry.getValue()) {
+                    if (numPartitions < task.partition + 1)
+                        numPartitions = task.partition + 1;
+                }
+
+                boolean topicNotReady = true;
+
+                while (topicNotReady) {
+                    Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic);
+
+                    // if topic does not exist, create it
+                    if (topicMetadata == null) {
+                        try {
+                            createTopic(topic, numPartitions);
+                        } catch (ZkNodeExistsException e) {
+                            // ignore and continue
+                        }
+                    } else {
+                        if (topicMetadata.size() > numPartitions) {
+                            // else if topic exists with more #.partitions than needed, delete in order to re-create it
+                            try {
+                                deleteTopic(topic);
+                            } catch (ZkNodeExistsException e) {
+                                // ignore and continue
+                            }
+                        } else if (topicMetadata.size() < numPartitions) {
+                            // else if topic exists with less #.partitions than needed, add partitions
+                            try {
+                                addPartitions(topic, numPartitions - topicMetadata.size(), topicMetadata);
+                            } catch (ZkNoNodeException e) {
+                                // ignore and continue
+                            }
+                        }
+
+                        topicNotReady = false;
+                    }
+                }
+
+                // wait until the topic metadata has been propagated to all brokers
+                List<PartitionInfo> partitions;
+                do {
+                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                } while (partitions == null || partitions.size() != numPartitions);
+            }
+
+            log.info("Completed validating changelog topics in partition assignor.");
+        }
+
         return assignment;
     }
 
@@ -220,7 +453,12 @@ public class KafkaStreamingPartitionAssignor implements PartitionAssignor, Confi
         this.partitionToTaskIds = partitionToTaskIds;
     }
 
-    public Set<TaskId> taskIds(TopicPartition partition) {
+    /* For Test Only */
+    public Set<TaskId> tasksForState(String stateName) {
+        return stateNameToTaskIds.get(stateName);
+    }
+
+    public Set<TaskId> tasksForPartition(TopicPartition partition) {
         return partitionToTaskIds.get(partition);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 1321cc5..3429df3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -118,11 +118,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
             throw new KafkaException("Can only create state stores during initialization.");
 
-        stateMgr.register(store, stateRestoreCallback);
+        stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 4cff02d..579d245 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,10 +19,11 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.OffsetCheckpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +44,11 @@ public class ProcessorStateManager {
 
     private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
 
+    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
     public static final String LOCK_FILE_NAME = ".lock";
 
+    private final String jobId;
     private final int partition;
     private final File baseDir;
     private final FileLock directoryLock;
@@ -55,9 +58,10 @@ public class ProcessorStateManager {
     private final Map<TopicPartition, Long> checkpointedOffsets;
     private final Map<TopicPartition, Long> offsetLimits;
     private final boolean isStandby;
-    private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks
+    private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
 
-    public ProcessorStateManager(int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+    public ProcessorStateManager(String jobId, int partition, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+        this.jobId = jobId;
         this.partition = partition;
         this.baseDir = baseDir;
         this.stores = new HashMap<>();
@@ -90,6 +94,10 @@ public class ProcessorStateManager {
         }
     }
 
+    public static String storeChangelogTopic(String jobId, String storeName) {
+        return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
+    }
+
     public static FileLock lockStateDirectory(File stateDir) throws IOException {
         File lockFile = new File(stateDir, ProcessorStateManager.LOCK_FILE_NAME);
         FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
@@ -104,7 +112,7 @@ public class ProcessorStateManager {
         return this.baseDir;
     }
 
-    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (store.name().equals(CHECKPOINT_FILE_NAME))
             throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
 
@@ -112,44 +120,52 @@ public class ProcessorStateManager {
             throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
 
         // check that the underlying change log topic exist or not
-        if (restoreConsumer.listTopics().containsKey(store.name())) {
-            boolean partitionNotFound = true;
-            for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(store.name())) {
+        String topic;
+        if (loggingEnabled)
+            topic = storeChangelogTopic(this.jobId, store.name());
+        else topic = store.name();
+
+        // block until the partition is ready for this state changelog topic or time has elapsed
+        boolean partitionNotFound = true;
+        long startTime = System.currentTimeMillis();
+        long waitTime = 5000L;      // hard-code the value since we should not block after KIP-4
+
+        do {
+            try {
+                Thread.sleep(50L);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+
+            for (PartitionInfo partitionInfo : restoreConsumer.partitionsFor(topic)) {
                 if (partitionInfo.partition() == partition) {
                     partitionNotFound = false;
                     break;
                 }
             }
+        } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
 
-            if (partitionNotFound)
-                throw new IllegalStateException("Store " + store.name() + "'s change log does not contain the partition " + partition);
-
-        } else {
-            throw new IllegalStateException("Change log topic for store " + store.name() + " does not exist yet");
-        }
+        if (partitionNotFound)
+            throw new KafkaException("Store " + store.name() + "'s change log does not contain partition " + partition);
 
         this.stores.put(store.name(), store);
 
         if (isStandby) {
             if (store.persistent())
-                restoreCallbacks.put(store.name(), stateRestoreCallback);
+                restoreCallbacks.put(topic, stateRestoreCallback);
         } else {
             restoreActiveState(store, stateRestoreCallback);
         }
     }
 
     private void restoreActiveState(StateStore store, StateRestoreCallback stateRestoreCallback) {
-
-        if (store == null)
-            throw new IllegalArgumentException("Store " + store.name() + " has not been registered.");
-
         // ---- try to restore the state from change-log ---- //
 
         // subscribe to the store's partition
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
         }
-        TopicPartition storePartition = new TopicPartition(store.name(), partition);
+        TopicPartition storePartition = new TopicPartition(storeChangelogTopic(this.jobId, store.name()), partition);
         restoreConsumer.assign(Collections.singletonList(storePartition));
 
         try {
@@ -195,8 +211,8 @@ public class ProcessorStateManager {
         Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
 
         for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
-            String storeName = entry.getKey();
-            TopicPartition storePartition = new TopicPartition(storeName, partition);
+            String topicName = entry.getKey();
+            TopicPartition storePartition = new TopicPartition(topicName, partition);
 
             if (checkpointedOffsets.containsKey(storePartition)) {
                 partitionsAndOffsets.put(storePartition, checkpointedOffsets.get(storePartition));
@@ -212,6 +228,7 @@ public class ProcessorStateManager {
         List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
 
         // restore states from changelog records
+
         StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());
 
         long lastOffset = -1L;
@@ -276,7 +293,7 @@ public class ProcessorStateManager {
 
             Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
             for (String storeName : stores.keySet()) {
-                TopicPartition part = new TopicPartition(storeName, partition);
+                TopicPartition part = new TopicPartition(storeChangelogTopic(jobId, storeName), partition);
 
                 // only checkpoint the offset to the offsets file if it is persistent;
                 if (stores.get(storeName).persistent()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index ea95300..e0583e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -110,11 +110,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
+    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
             throw new KafkaException("Can only create state stores during initialization.");
 
-        stateMgr.register(store, stateRestoreCallback);
+        stateMgr.register(store, loggingEnabled, stateRestoreCallback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index d0d8493..4cc4ea4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -45,19 +45,23 @@ public class StandbyTask extends AbstractTask {
      * Create {@link StandbyTask} with its assigned partitions
      *
      * @param id                    the ID of this task
-     * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
+     * @param jobId                 the ID of the job
+     * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
+     * @param consumer              the instance of {@link Consumer}
+     * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
      * @param config                the {@link StreamingConfig} specified by the user
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
+                       String jobId,
                        Collection<TopicPartition> partitions,
                        ProcessorTopology topology,
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
                        StreamingConfig config,
                        StreamingMetrics metrics) {
-        super(id, partitions, topology, consumer, restoreConsumer, config, true);
+        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
         this.processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 24c450e..2e58ad5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -61,15 +61,17 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * Create {@link StreamTask} with its assigned partitions
      *
      * @param id                    the ID of this task
+     * @param jobId                 the ID of the job
+     * @param partitions            the collection of assigned {@link TopicPartition}
+     * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
      * @param producer              the instance of {@link Producer}
      * @param restoreConsumer       the instance of {@link Consumer} used when restoring state
-     * @param partitions            the collection of assigned {@link TopicPartition}
-     * @param topology              the instance of {@link ProcessorTopology}
      * @param config                the {@link StreamingConfig} specified by the user
      * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StreamTask(TaskId id,
+                      String jobId,
                       Collection<TopicPartition> partitions,
                       ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
@@ -77,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamingConfig config,
                       StreamingMetrics metrics) {
-        super(id, partitions, topology, consumer, restoreConsumer, config, false);
+        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c77a027..4d1ef43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -70,7 +70,9 @@ public class StreamThread extends Thread {
     private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
-    public final UUID clientUUID;
+    public final String jobId;
+    public final String clientId;
+    public final UUID processId;
 
     protected final StreamingConfig config;
     protected final TopologyBuilder builder;
@@ -83,7 +85,6 @@ public class StreamThread extends Thread {
     private final Map<TaskId, StreamTask> activeTasks;
     private final Map<TaskId, StandbyTask> standbyTasks;
     private final Set<TaskId> prevTasks;
-    private final String clientId;
     private final Time time;
     private final File stateDir;
     private final long pollTimeMs;
@@ -92,6 +93,8 @@ public class StreamThread extends Thread {
     private final long totalRecordsToProcess;
     private final StreamingMetricsImpl sensors;
 
+    private KafkaStreamingPartitionAssignor partitionAssignor = null;
+
     private long lastClean;
     private long lastCommit;
     private long recordsProcessed;
@@ -118,11 +121,12 @@ public class StreamThread extends Thread {
 
     public StreamThread(TopologyBuilder builder,
                         StreamingConfig config,
+                        String jobId,
                         String clientId,
-                        UUID clientUUID,
+                        UUID processId,
                         Metrics metrics,
                         Time time) throws Exception {
-        this(builder, config, null , null, null, clientId, clientUUID, metrics, time);
+        this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
     }
 
     StreamThread(TopologyBuilder builder,
@@ -130,19 +134,20 @@ public class StreamThread extends Thread {
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
+                 String jobId,
                  String clientId,
-                 UUID clientUUID,
+                 UUID processId,
                  Metrics metrics,
                  Time time) throws Exception {
         super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
 
+        this.jobId = jobId;
         this.config = config;
         this.builder = builder;
         this.sourceTopics = builder.sourceTopics();
         this.clientId = clientId;
-        this.clientUUID = clientUUID;
+        this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
-        this.partitionGrouper.topicGroups(builder.topicGroups());
 
         // set the producer and consumer clients
         this.producer = (producer != null) ? producer : createProducer();
@@ -175,23 +180,27 @@ public class StreamThread extends Thread {
         this.running = new AtomicBoolean(true);
     }
 
+    public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) {
+        this.partitionAssignor = partitionAssignor;
+    }
+
     private Producer<byte[], byte[]> createProducer() {
         log.info("Creating producer client for stream thread [" + this.getName() + "]");
-        return new KafkaProducer<>(config.getProducerConfigs(),
+        return new KafkaProducer<>(config.getProducerConfigs(this.clientId),
                 new ByteArraySerializer(),
                 new ByteArraySerializer());
     }
 
     private Consumer<byte[], byte[]> createConsumer() {
         log.info("Creating consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(this),
+        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
 
     private Consumer<byte[], byte[]> createRestoreConsumer() {
         log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
-        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(),
+        return new KafkaConsumer<>(config.getRestoreConsumerConfigs(this.clientId),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
@@ -516,14 +525,17 @@ public class StreamThread extends Thread {
 
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
-        return new StreamTask(id, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+        return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
+        if (partitionAssignor == null)
+            throw new KafkaException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
+
         HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
 
         for (TopicPartition partition : assignment) {
-            Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
+            Set<TaskId> taskIds = partitionAssignor.tasksForPartition(partition);
             for (TaskId taskId : taskIds) {
                 Set<TopicPartition> partitions = partitionsForTask.get(taskId);
                 if (partitions == null) {
@@ -574,17 +586,20 @@ public class StreamThread extends Thread {
         ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, partitions, topology, consumer, restoreConsumer, config, sensors);
+            return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
         } else {
             return null;
         }
     }
 
     private void addStandbyTasks() {
+        if (partitionAssignor == null)
+            throw new KafkaException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
+
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
         // create the standby tasks
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionGrouper.standbyTasks().entrySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) {
             TaskId taskId = entry.getKey();
             Set<TopicPartition> partitions = entry.getValue();
             StandbyTask task = createStandbyTask(taskId, partitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 54042b9..43009a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -30,30 +30,32 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
+    private static final int CURRENT_VERSION = 1;
+
     public final int version;
-    public final UUID clientUUID;
+    public final UUID processId;
     public final Set<TaskId> prevTasks;
     public final Set<TaskId> standbyTasks;
 
-    public SubscriptionInfo(UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
-        this(1, clientUUID, prevTasks, standbyTasks);
+    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+        this(CURRENT_VERSION, processId, prevTasks, standbyTasks);
     }
 
-    private SubscriptionInfo(int version, UUID clientUUID, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
         this.version = version;
-        this.clientUUID = clientUUID;
+        this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
     }
 
     public ByteBuffer encode() {
-        if (version == 1) {
-            ByteBuffer buf = ByteBuffer.allocate(4 + 16 + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
+        if (version == CURRENT_VERSION) {
+            ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
             // version
-            buf.putInt(1);
+            buf.putInt(version);
             // encode client UUID
-            buf.putLong(clientUUID.getMostSignificantBits());
-            buf.putLong(clientUUID.getLeastSignificantBits());
+            buf.putLong(processId.getMostSignificantBits());
+            buf.putLong(processId.getLeastSignificantBits());
             // encode ids of previously running tasks
             buf.putInt(prevTasks.size());
             for (TaskId id : prevTasks) {
@@ -81,9 +83,9 @@ public class SubscriptionInfo {
 
         // Decode version
         int version = data.getInt();
-        if (version == 1) {
+        if (version == CURRENT_VERSION) {
             // Decode client UUID
-            UUID clientUUID = new UUID(data.getLong(), data.getLong());
+            UUID processId = new UUID(data.getLong(), data.getLong());
             // Decode previously active tasks
             Set<TaskId> prevTasks = new HashSet<>();
             int numPrevs = data.getInt();
@@ -98,7 +100,7 @@ public class SubscriptionInfo {
                 standbyTasks.add(TaskId.readFrom(data));
             }
 
-            return new SubscriptionInfo(version, clientUUID, prevTasks, standbyTasks);
+            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks);
 
         } else {
             TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
@@ -109,7 +111,7 @@ public class SubscriptionInfo {
 
     @Override
     public int hashCode() {
-        return version ^ clientUUID.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        return version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
     }
 
     @Override
@@ -117,7 +119,7 @@ public class SubscriptionInfo {
         if (o instanceof SubscriptionInfo) {
             SubscriptionInfo other = (SubscriptionInfo) o;
             return this.version == other.version &&
-                    this.clientUUID.equals(other.clientUUID) &&
+                    this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks);
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 8aed6b8..d75e7e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -88,7 +88,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
             final Deserializer<V> valDeserializer = serialization.valueDeserializer();
 
-            context.register(this, new StateRestoreCallback() {
+            context.register(this, loggingEnabled, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
                     inner.put(keyDeserializer.deserialize(name, key),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d05fa0a0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index 40ca9f5..029d72f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -56,7 +56,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private Serdes<K, V> serdes;
     private ProcessorContext context;
-    private String dbName;
     private String dirName;
     private RocksDB db;