You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/27 04:43:42 UTC

[kafka] branch 0.10.1 updated: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4746)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.10.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.10.1 by this push:
     new faac933  KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4746)
faac933 is described below

commit faac933aeee5d976008a17248650943923728408
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Mar 26 21:43:38 2018 -0700

    KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4746)
    
    Introduces new config parameter `upgrade.from`.
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
 bin/kafka-run-class.sh                             |  38 +++-
 build.gradle                                       |  13 ++
 .../consumer/internals/AbstractCoordinator.java    |   2 +-
 .../apache/kafka/common/protocol/types/Struct.java |   2 +-
 .../authenticator/SaslClientCallbackHandler.java   |  11 +-
 .../kafka/clients/producer/ProducerRecordTest.java |  12 +-
 docs/upgrade.html                                  |  17 ++
 gradle/dependencies.gradle                         |   2 +
 gradle/rat.gradle                                  |  14 +-
 jenkins.sh                                         |  20 +++
 settings.gradle                                    |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  26 ++-
 .../kafka/streams/processor/TopologyBuilder.java   |   2 +-
 .../internals/StreamPartitionAssignor.java         |  24 ++-
 .../internals/assignment/AssignmentInfo.java       |   7 +-
 .../internals/assignment/SubscriptionInfo.java     |   5 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  28 +--
 .../apache/kafka/streams/StreamsConfigTest.java    |  42 +++--
 .../streams/integration/FanoutIntegrationTest.java |   5 +-
 .../KStreamAggregationDedupIntegrationTest.java    |   8 +-
 .../KStreamAggregationIntegrationTest.java         |  29 ++-
 .../streams/integration/ResetIntegrationTest.java  |   2 +-
 .../internals/StreamPartitionAssignorTest.java     |  97 ++++++++--
 .../processor/internals/StreamThreadTest.java      |  12 +-
 .../internals/assignment/AssignmentInfoTest.java   |   3 +-
 .../kafka/streams/smoketest/SmokeTestClient.java   |  10 +-
 .../kafka/streams/smoketest/SmokeTestDriver.java   |  33 ++--
 .../kafka/streams/smoketest/SmokeTestUtil.java     |  23 +--
 .../kafka/streams/smoketest/StreamsSmokeTest.java  |  14 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  78 ++++++++
 .../kafka/test/ProcessorTopologyTestDriver.java    |   2 +-
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 +++++++++++
 tests/kafkatest/services/streams.py                | 171 +++++++++++++++++-
 .../tests/streams/streams_upgrade_test.py          | 200 +++++++++++++++++++++
 tests/kafkatest/version.py                         |   9 +-
 vagrant/base.sh                                    |   2 +
 36 files changed, 904 insertions(+), 167 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 1f5140b..77123ff 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,48 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-    CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+else
+  for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index d221d96..2a54021 100644
--- a/build.gradle
+++ b/build.gradle
@@ -776,6 +776,19 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 93b92bb..dbbb912 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -918,7 +918,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
                 this.failed.set(new RuntimeException(e));
             } catch (RuntimeException e) {
-                log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+                log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
                 this.failed.set(e);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701..7488748 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -316,7 +316,7 @@ public class Struct {
             Field f = this.schema.get(i);
             if (f.type() instanceof ArrayOf) {
                 if (this.get(f) != null) {
-                    Object[] arrayObject = (Object []) this.get(f);
+                    Object[] arrayObject = (Object[]) this.get(f);
                     for (Object arrayItem: arrayObject)
                         result = prime * result + arrayItem.hashCode();
                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 8e0b8db..b80dfcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@ import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -59,7 +58,7 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
                     nc.setName(nc.getDefaultName());
             } else if (callback instanceof PasswordCallback) {
                 if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
-                    char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+                    char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
                     ((PasswordCallback) callback).setPassword(password);
                 } else {
                     String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index a844bb0..5186d05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -26,24 +26,24 @@ public class ProducerRecordTest {
 
     @Test
     public void testEqualsAndHashCode() {
-        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, producerRecord);
         assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
 
-        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, equalRecord);
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
-        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1);
+        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1);
         assertFalse(producerRecord.equals(topicMisMatch));
 
-        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1);
+        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2, "key", 1);
         assertFalse(producerRecord.equals(partitionMismatch));
 
-        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1);
+        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1);
         assertFalse(producerRecord.equals(keyMisMatch));
 
-        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2);
+        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
         ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6b9747..faa96c1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
 
 <h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5>
 <ul>
+    <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+        </ul>
+    </li>
     <li> Stream grouping and aggregation split into two methods:
         <ul>
             <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2ff459f..07944a9 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -31,6 +31,7 @@ versions += [
   jackson: "2.6.3",
   jetty: "9.2.22.v20170606",
   jersey: "2.22.2",
+  kafka_0100: "0.10.0.1",
   log4j: "1.2.17",
   jopt: "4.9",
   junit: "4.12",
@@ -91,6 +92,7 @@ libs += [
   junit: "junit:junit:$versions.junit",
   log4j: "log4j:log4j:$versions.log4j",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
   lz4: "net.jpountz.lz4:lz4:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index d62b372..a51876c 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -84,9 +84,15 @@ class RatTask extends DefaultTask {
     if (!reportDir.exists()) {
       reportDir.mkdirs()
     }
-    generateXmlReport(reportDir)
-    printUnknownFiles()
-    generateHtmlReport()
+    def origEncoding = System.getProperty("file.encoding")
+    try {
+      System.setProperty("file.encoding", "UTF-8") //affects the output of the ant rat task
+      generateXmlReport(reportDir)
+      printUnknownFiles()
+      generateHtmlReport()
+    } finally {
+      System.setProperty("file.encoding", origEncoding)
+    }
   }
 }
 
@@ -109,7 +115,7 @@ class RatPlugin implements Plugin<Project> {
       mavenCentral()
     }
     project.dependencies {
-      rat 'org.apache.rat:apache-rat-tasks:0.11'
+      rat 'org.apache.rat:apache-rat-tasks:0.12'
     }
   }
 }
diff --git a/jenkins.sh b/jenkins.sh
new file mode 100755
index 0000000..c21eb1d
--- /dev/null
+++ b/jenkins.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
+# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
+# the modules are executed before the integration tests.
+./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest test rat --no-daemon -PxmlFindBugsReport=true -PtestLoggingEvents=started,passed,skipped,failed "$@"
diff --git a/settings.gradle b/settings.gradle
index d430c2f..f3a1b81 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
+        'log4j-appender', 'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5ba4383..e33efef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Configuration for Kafka Streams. Documentation for these configurations can be found in the <a
@@ -54,6 +55,16 @@ public class StreamsConfig extends AbstractConfig {
     // Prefix used to isolate producer configs from consumer configs.
     public static final String PRODUCER_PREFIX = "producer.";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+     */
+    public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+    /** {@code upgrade.from} */
+    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
     /** <code>state.dir</code> */
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -257,14 +268,19 @@ public class StreamsConfig extends AbstractConfig {
                                         10 * 1024 * 1024L,
                                         atLeast(0),
                                         Importance.LOW,
-                                        CACHE_MAX_BYTES_BUFFERING_DOC);
+                                        CACHE_MAX_BYTES_BUFFERING_DOC)
+                                .define(UPGRADE_FROM_CONFIG,
+                                        ConfigDef.Type.STRING,
+                                        null,
+                                        in(null, UPGRADE_FROM_0100),
+                                        ConfigDef.Importance.LOW,
+                                        UPGRADE_FROM_DOC);
     }
 
     // this is the list of configs for underlying clients
     // that streams prefer different default values
     private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
 
@@ -272,8 +288,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -342,6 +357,7 @@ public class StreamsConfig extends AbstractConfig {
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
 
         // add configs required for stream partition assignor
+        consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
         consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..bfca83a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -676,7 +676,7 @@ public class TopologyBuilder {
         }
     }
 
-    private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+    private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
         final Set<String> sourceTopics = new HashSet<>();
         for (String parent : parents) {
             NodeFactory nodeFactory = nodeFactories.get(parent);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..e6c407f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -66,7 +65,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        public AssignedPartition(TaskId taskId, TopicPartition partition) {
+        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
@@ -92,6 +91,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private StreamThread streamThread;
 
+    private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
     private int numStandbyReplicas;
     private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
@@ -111,6 +111,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+        final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+            log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+            userMetadataVersion = 1;
+        }
 
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
@@ -174,7 +179,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+        SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -265,12 +270,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
         Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
         // decode subscription info
+        int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
-
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version;
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
             if (info.userEndPoint != null) {
                 final String[] hostPort = info.userEndPoint.split(":");
                 consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
@@ -460,6 +469,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
 
                 assignmentSuppliers.add(new AssignmentSupplier(consumer,
+                                                               minUserMetadataVersion,
                                                                active,
                                                                standby,
                                                                endPointMap,
@@ -483,17 +493,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     class AssignmentSupplier {
         private final String consumer;
+        private final int metadataVersion;
         private final List<TaskId> active;
         private final Map<TaskId, Set<TopicPartition>> standby;
         private final Map<HostInfo, Set<TopicPartition>> endPointMap;
         private final List<TopicPartition> activePartitions;
 
         AssignmentSupplier(final String consumer,
+                           final int metadataVersion,
                            final List<TaskId> active,
                            final Map<TaskId, Set<TopicPartition>> standby,
                            final Map<HostInfo, Set<TopicPartition>> endPointMap,
                            final List<TopicPartition> activePartitions) {
             this.consumer = consumer;
+            this.metadataVersion = metadataVersion;
             this.active = active;
             this.standby = standby;
             this.endPointMap = endPointMap;
@@ -501,7 +514,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         Assignment get() {
-            return new Assignment(activePartitions, new AssignmentInfo(active,
+            return new Assignment(activePartitions, new AssignmentInfo(metadataVersion,
+                                                                       active,
                                                                        standby,
                                                                        endPointMap).encode());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ce9aa63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.common.record.ByteBufferInputStream;
@@ -56,7 +55,7 @@ public class AssignmentInfo {
         this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+    public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
                              Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
@@ -155,9 +154,7 @@ public class AssignmentInfo {
                 }
             }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
-
+            return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index c3481c0..92c50a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -32,7 +31,7 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
@@ -44,7 +43,7 @@ public class SubscriptionInfo {
         this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+    public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 35b88db..e4ba9cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -39,11 +40,12 @@ public class KafkaStreamsTest {
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     @Test
-    public void testStartAndClose() throws Exception {
+    public void testStartAndClose() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -62,11 +64,12 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCloseIsIdempotent() throws Exception {
+    public void testCloseIsIdempotent() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -79,10 +82,11 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartOnceClosed() throws Exception {
+    public void testCannotStartOnceClosed() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -99,10 +103,11 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartTwice() throws Exception {
+    public void testCannotStartTwice() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -119,25 +124,25 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
@@ -152,16 +157,18 @@ public class KafkaStreamsTest {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         return new KafkaStreams(builder, props);
     }
 
     @Test
-    public void testCleanup() throws Exception {
+    public void testCleanup() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -173,10 +180,11 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotCleanupWhileRunning() throws Exception {
+    public void testCannotCleanupWhileRunning() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f03bed9..9d40148 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -58,7 +58,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetProducerConfigs() throws Exception {
+    public void testGetProducerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
@@ -66,7 +66,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetConsumerConfigs() throws Exception {
+    public void testGetConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
@@ -75,7 +75,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
+    public void testGetRestoreConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -84,7 +84,7 @@ public class StreamsConfigTest {
 
     @Test
     public void defaultSerdeShouldBeConfigured() {
-        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        Map<String, Object> serializerConfigs = new HashMap<>();
         serializerConfigs.put("key.serializer.encoding", "UTF8");
         serializerConfigs.put("value.serializer.encoding", "UTF-16");
         Serializer<String> serializer = Serdes.String().serializer();
@@ -115,7 +115,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -125,7 +125,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -135,7 +135,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -143,7 +143,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -151,7 +151,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -160,7 +160,7 @@ public class StreamsConfigTest {
 
 
     @Test
-    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -170,7 +170,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -180,7 +180,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -190,7 +190,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportNonPrefixedProducerConfigs() {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -199,24 +199,22 @@ public class StreamsConfigTest {
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
-
-
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.keySerde();
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.valueSerde();
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConfigs() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -226,7 +224,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultProducerConfigs() {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
@@ -234,7 +232,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -244,14 +242,14 @@ public class StreamsConfigTest {
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getConsumerConfigs(null, "a", "b");
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getRestoreConsumerConfigs("client");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..88098dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -79,13 +80,12 @@ public class FanoutIntegrationTest {
     private static final String OUTPUT_TOPIC_C = "C";
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() {
         CLUSTER.createTopic(INPUT_TOPIC_A);
         CLUSTER.createTopic(OUTPUT_TOPIC_B);
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
-
     @Parameter
     public long cacheSizeBytes;
 
@@ -117,6 +117,7 @@ public class FanoutIntegrationTest {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
         final KStream<byte[], String> stream2 = stream1.mapValues(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index ab08dbe..eeb455b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -122,8 +122,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 5);
+            new StringDeserializer(),
+            5);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -172,8 +172,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Comparator<KeyValue<String, String>>
             comparator =
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e5560c1..383a793 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -97,11 +97,10 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration = new Properties();
         final String applicationId = "kgrouped-stream-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
@@ -155,8 +154,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -209,8 +208,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 15);
+            new StringDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, String>>
             comparator =
@@ -263,8 +262,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 10);
+            new IntegerDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
             @Override
@@ -313,8 +312,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 15);
+            new IntegerDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, Integer>>
             comparator =
@@ -364,8 +363,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -406,8 +405,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5f85536..7848d1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -260,7 +260,7 @@ public class ResetIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..8a1e13a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -99,13 +100,14 @@ public class StreamPartitionAssignorTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testSubscription() throws Exception {
+    public void testSubscription() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -148,7 +150,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignBasic() throws Exception {
+    public void testAssignBasic() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -215,7 +217,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithNewTasks() throws Exception {
+    public void testAssignWithNewTasks() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -274,7 +276,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStates() throws Exception {
+    public void testAssignWithStates() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -334,7 +336,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
+    public void testAssignWithStandbyReplicas() {
         Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
@@ -449,7 +451,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testOnAssignment() throws Exception {
+    public void testOnAssignment() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
@@ -482,7 +484,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopics() throws Exception {
+    public void testAssignWithInternalTopics() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -521,7 +523,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -555,7 +557,7 @@ public class StreamPartitionAssignorTest {
         subscriptions.put("consumer10",
                           new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
         assertEquals(2, internalTopicManager.readyTopics.size());
@@ -563,7 +565,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+    public void shouldAddUserDefinedEndPointToSubscription() {
         final Properties properties = configProps();
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
         final StreamsConfig config = new StreamsConfig(properties);
@@ -589,7 +591,70 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "applicationId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            new SystemTime(),
+            new StreamsMetadataState(builder));
+
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId"));
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        assertEquals(2, assignment.size());
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+    }
+
+    @Test
+    public void shouldDownGradeSubscription() {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
+        final StreamsConfig config = new StreamsConfig(properties);
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+
+        final String clientId = "client-id";
+        final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId,
+            UUID.randomUUID(), new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:8080";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -628,7 +693,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -655,7 +720,7 @@ public class StreamPartitionAssignorTest {
     }
     
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:j87yhk";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -682,7 +747,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
@@ -696,7 +761,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+    public void shouldSetClusterMetadataOnAssignment() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
@@ -718,7 +783,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f252e9..2ea5738 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -40,6 +35,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.io.File;
@@ -56,6 +52,11 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -117,6 +118,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..6c94c18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -65,10 +65,9 @@ public class AssignmentInfoTest {
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
         assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+        assertEquals(1, decoded.version);
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has changed for V2
      * so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index f920c51..63ad01d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -105,7 +105,7 @@ public class SmokeTestClient extends SmokeTestUtil {
             }
         });
 
-        data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+        data.process(SmokeTestUtil.<String, Integer>printProcessorSupplier("data"));
 
         // min
         KGroupedStream<String, Integer>
@@ -131,7 +131,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "min");
 
         KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
-        minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+        minTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("min"));
 
         // max
         groupedData.aggregate(
@@ -153,7 +153,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, intSerde, "max");
 
         KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
-        maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+        maxTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("max"));
 
         // sum
         groupedData.aggregate(
@@ -176,7 +176,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
 
         KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
-        sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+        sumTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("sum"));
 
         // cnt
         groupedData.count(UnlimitedWindows.of(), "uwin-cnt")
@@ -185,7 +185,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, longSerde, "cnt");
 
         KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
-        cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+        cntTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("cnt"));
 
         // dif
         maxTable.join(minTable,
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index f9d30d5..f103355 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -125,38 +125,48 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
-        Properties props = new Properties();
+        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final boolean autoTerminate) throws Exception {
+        final Properties props = new Properties();
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
 
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+        final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
 
         int numRecordsProduced = 0;
 
-        Map<String, Set<Integer>> allData = new HashMap<>();
-        ValueList[] data = new ValueList[numKeys];
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
             allData.put(data[i].key, new HashSet<Integer>());
         }
-        Random rand = new Random();
+        final Random rand = new Random();
 
-        int remaining = data.length;
+        int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+        if (autoTerminate) {
+            remaining = data.length;
+        }
 
         while (remaining > 0) {
-            int index = rand.nextInt(remaining);
-            String key = data[index].key;
+            final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+            final String key = data[index].key;
             int value = data[index].next();
 
-            if (value < 0) {
+            if (autoTerminate && value < 0) {
                 remaining--;
                 data[index] = data[remaining];
                 value = END;
             }
 
-            ProducerRecord<byte[], byte[]> record =
+            final ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
 
             producer.send(record);
@@ -165,8 +175,9 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 numRecordsProduced++;
                 allData.get(key).add(value);
 
-                if (numRecordsProduced % 100 == 0)
+                if (numRecordsProduced % 100 == 0) {
                     System.out.println(numRecordsProduced + " records produced");
+                }
 
                 Thread.sleep(10);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 7ff738f..d9ad745 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -37,27 +37,20 @@ public class SmokeTestUtil {
     public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
     public final static int END = Integer.MAX_VALUE;
 
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
-        return printProcessorSupplier(topic, false);
-    }
-
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
-        return new ProcessorSupplier<String, T>() {
-            public Processor<String, T> get() {
-                return new AbstractProcessor<String, T>() {
+    public static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
                     private int numRecordsProcessed = 0;
-                    private ProcessorContext context;
 
                     @Override
                     public void init(ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
-                        this.context = context;
                     }
 
                     @Override
-                    public void process(String key, T value) {
-                        if (printOffset) System.out.println(">>> " + context.offset());
+                    public void process(K key, V value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -65,12 +58,10 @@ public class SmokeTestUtil {
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(long timestamp) {}
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() {}
                 };
             }
         };
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index c26544e..3328ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -24,7 +24,7 @@ import java.util.Set;
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= command kafka zookeeper stateDir disableAutoTerminate
      *  command := "run" | "process"
      *
      * @param args
@@ -34,12 +34,14 @@ public class StreamsSmokeTest {
         String kafka = args.length > 1 ? args[1] : null;
         String zookeeper = args.length > 2 ? args[2] : null;
         String stateDir = args.length > 3 ? args[3] : null;
+        boolean disableAutoTerminate = args.length > 4;
 
-        System.out.println("StreamsSmokeTest instance started");
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
             case "standalone":
@@ -49,8 +51,12 @@ public class StreamsSmokeTest {
                 // this starts the driver (data generation and result verification)
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
-                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
-                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                if (disableAutoTerminate) {
+                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+                } else {
+                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
                 break;
             case "process":
                 // this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..e771bea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.smoketest.SmokeTestUtil;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+        final String upgradeFrom = args.length > 3 ? args[3] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+        }
+
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 83a9092..1bedd87 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -345,7 +345,7 @@ public class ProcessorTopologyTestDriver {
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
             consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
         }
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..7d3ed43
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a63810e..b857bd5 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -33,6 +33,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
+    CLEAN_NODE_ENABLED = True
+
     logs = {
         "streams_log": {
             "path": LOG_FILE,
@@ -43,6 +45,114 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.0-1": {
+            "path": LOG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stdout.0-1": {
+            "path": STDOUT_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stderr.0-1": {
+            "path": STDERR_FILE + ".0-1",
+            "collect_default": True},
+        "streams_log.0-2": {
+            "path": LOG_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stdout.0-2": {
+            "path": STDOUT_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stderr.0-2": {
+            "path": STDERR_FILE + ".0-2",
+            "collect_default": True},
+        "streams_log.0-3": {
+            "path": LOG_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stdout.0-3": {
+            "path": STDOUT_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stderr.0-3": {
+            "path": STDERR_FILE + ".0-3",
+            "collect_default": True},
+        "streams_log.0-4": {
+            "path": LOG_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stdout.0-4": {
+            "path": STDOUT_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stderr.0-4": {
+            "path": STDERR_FILE + ".0-4",
+            "collect_default": True},
+        "streams_log.0-5": {
+            "path": LOG_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stdout.0-5": {
+            "path": STDOUT_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stderr.0-5": {
+            "path": STDERR_FILE + ".0-5",
+            "collect_default": True},
+        "streams_log.0-6": {
+            "path": LOG_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stdout.0-6": {
+            "path": STDOUT_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stderr.0-6": {
+            "path": STDERR_FILE + ".0-6",
+            "collect_default": True},
+        "streams_log.1-1": {
+            "path": LOG_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stdout.1-1": {
+            "path": STDOUT_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stderr.1-1": {
+            "path": STDERR_FILE + ".1-1",
+            "collect_default": True},
+        "streams_log.1-2": {
+            "path": LOG_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stdout.1-2": {
+            "path": STDOUT_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stderr.1-2": {
+            "path": STDERR_FILE + ".1-2",
+            "collect_default": True},
+        "streams_log.1-3": {
+            "path": LOG_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stdout.1-3": {
+            "path": STDOUT_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stderr.1-3": {
+            "path": STDERR_FILE + ".1-3",
+            "collect_default": True},
+        "streams_log.1-4": {
+            "path": LOG_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stdout.1-4": {
+            "path": STDOUT_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stderr.1-4": {
+            "path": STDERR_FILE + ".1-4",
+            "collect_default": True},
+        "streams_log.1-5": {
+            "path": LOG_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stdout.1-5": {
+            "path": STDOUT_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stderr.1-5": {
+            "path": STDERR_FILE + ".1-5",
+            "collect_default": True},
+        "streams_log.1-6": {
+            "path": LOG_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stdout.1-6": {
+            "path": STDOUT_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stderr.1-6": {
+            "path": STDERR_FILE + ".1-6",
+            "collect_default": True},
     }
 
     def __init__(self, context, kafka, command):
@@ -95,7 +205,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        if self.CLEAN_NODE_ENABLED:
+            node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
 
     def start_cmd(self, node):
         args = self.args.copy()
@@ -120,10 +231,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
 
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
 
-        self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+        self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+            monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -132,8 +243,62 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+        self.DISABLE_AUTO_TERMINATE = ""
 
+    def disable_auto_terminate(self):
+        self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
+              " %(command)s %(kafka)s %(zk)s %(state_dir)s %(disable_auto_terminate)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, context, kafka):
         super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
+
+class StreamsUpgradeTestJobRunnerService(StreamsSmokeTestBaseService):
+    def __init__(self, context, kafka):
+        super(StreamsUpgradeTestJobRunnerService, self).__init__(context, kafka, "")
+        self.UPGRADE_FROM = ""
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['upgrade_from'] = self.UPGRADE_FROM
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              " INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s org.apache.kafka.streams.tests.StreamsUpgradeTest " \
+              " %(kafka)s %(zk)s %(state_dir)s %(upgrade_from)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..8266e07
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, TRUNK_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+    """
+    Test upgrading Kafka Streams from 0.10.0.x to 0.10.1.x (ie, TRUNK)
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+
+    def test_upgrade(self):
+        """
+        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to 0.10.1
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with_0100()
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        # first rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "0.10.0", counter)
+            counter = counter + 1
+
+        # second rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            self.do_rolling_bounce(p, "", counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def start_all_nodes_with_0100(self):
+        # start first with 0.10.0
+        self.prepare_for_0100(self.processor1)
+        node1 = self.processor1.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+                self.processor1.start()
+                log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                       timeout_sec=60,
+                                       err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node1.account))
+                monitor.wait_until("processed 100 records from topic",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+        # start second with 0.10.0
+        self.prepare_for_0100(self.processor2)
+        node2 = self.processor2.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+                    self.processor2.start()
+                    log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node2.account))
+                    first_monitor.wait_until("processed 100 records from topic",
+                                             timeout_sec=60,
+                                             err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                    second_monitor.wait_until("processed 100 records from topic",
+                                              timeout_sec=60,
+                                              err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+        # start third with 0.10.0
+        self.prepare_for_0100(self.processor3)
+        node3 = self.processor3.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+                        self.processor3.start()
+                        log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node3.account))
+                        first_monitor.wait_until("processed 100 records from topic",
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                        second_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+                        third_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+    @staticmethod
+    def prepare_for_0100(processor):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+        processor.set_version(str(LATEST_0_10_0))
+
+    def do_rolling_bounce(self, processor, upgrade_from, counter):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        # stop processor and wait for rebalance of others
+        with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                processor.stop()
+                first_other_monitor.wait_until("processed 100 records from topic",
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                second_other_monitor.wait_until("processed 100 records from topic",
+                                                timeout_sec=60,
+                                                err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+        node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+        if upgrade_from == "":  # upgrade disabled -- second round of rolling bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling boundes
+
+        node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
+
+        processor.set_version("")  # set to TRUNK
+        processor.set_upgrade_from(upgrade_from)
+
+        grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+                    with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                        processor.start()
+
+                        log_monitor.wait_until("Kafka version : " + str(TRUNK_VERSION),
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + str(TRUNK_VERSION) + " " + str(node.account))
+                        first_other_monitor.wait_until("processed 100 records from topic",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                        found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        second_other_monitor.wait_until("processed 100 records from topic",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                        found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        monitor.wait_until("processed 100 records from topic",
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
\ No newline at end of file
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 239a9f4..ebf5ecf 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -64,6 +64,7 @@ def get_version(node=None):
         return TRUNK
 
 TRUNK = KafkaVersion("trunk")
+TRUNK_VERSION = KafkaVersion("0.10.1.2-SNAPSHOT")
 
 # 0.8.2.X versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -78,7 +79,11 @@ LATEST_0_9 = V_0_9_0_1
 # 0.10.0.X versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
 V_0_10_0_1 = KafkaVersion("0.10.0.1")
-# Adding 0.10.0 as the next version will be 0.10.1.x
 LATEST_0_10_0 = V_0_10_0_1
 
-LATEST_0_10 = LATEST_0_10_0
\ No newline at end of file
+# 0.10.1.X versions
+V_0_10_1_0 = KafkaVersion("0.10.1.0")
+V_0_10_1_1 = KafkaVersion("0.10.1.1")
+LATEST_0_10_1 = V_0_10_1_1
+
+LATEST_0_10 = LATEST_0_10_1
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 88878dc..b8cde3a 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -52,6 +52,8 @@ get_kafka() {
 
     kafka_dir=/opt/kafka-$version
     url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+    # the .tgz above does not include the streams test jar hence we need to get it separately
+    url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
     if [ ! -d /opt/kafka-$version ]; then
         pushd /tmp
         curl -O $url

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.