You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/04/18 07:38:36 UTC

[kafka] branch trunk updated: KAFKA-6054: Update Kafka Streams metadata to version 3 (#4880)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cae4221   KAFKA-6054: Update Kafka Streams metadata to version 3 (#4880)
cae4221 is described below

commit cae42215b7597cc39afc682cb0399782cf42fe23
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Apr 18 09:38:27 2018 +0200

     KAFKA-6054: Update Kafka Streams metadata to version 3 (#4880)
    
     - adds Streams upgrade tests for 1.1 release
     - introduces metadata version 3
    
    Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 build.gradle                                       |  12 ++
 gradle/dependencies.gradle                         |   2 +
 settings.gradle                                    |   5 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  34 +++-
 .../internals/StreamsPartitionAssignor.java        |  36 +++-
 .../internals/assignment/AssignmentInfo.java       | 110 +++++++++---
 .../internals/assignment/SubscriptionInfo.java     | 184 ++++++++++++++++-----
 .../internals/StreamsPartitionAssignorTest.java    |  80 +++++++--
 .../internals/assignment/AssignmentInfoTest.java   | 105 +++++-------
 .../internals/assignment/SubscriptionInfoTest.java |  91 ++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 104 ++++++++++++
 tests/kafkatest/services/streams.py                |   1 +
 .../tests/streams/streams_upgrade_test.py          |  32 ++--
 tests/kafkatest/version.py                         |   6 +-
 14 files changed, 586 insertions(+), 216 deletions(-)

diff --git a/build.gradle b/build.gradle
index 69f560e..8b07a95 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') {
   }
 }
 
+project(':streams:upgrade-system-tests-11') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-11"
+
+  dependencies {
+    testCompile libs.kafkaStreams_11
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index d6beba9..e5f2958 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -67,6 +67,7 @@ versions += [
   kafka_0102: "0.10.2.1",
   kafka_0110: "0.11.0.2",
   kafka_10: "1.0.1",
+  kafka_11: "1.1.0",
   lz4: "1.4.1",
   metrics: "2.2.0",
   // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@@ -115,6 +116,7 @@ libs += [
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
   kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
   kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
+  kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
   log4j: "log4j:log4j:$versions.log4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index 0313684..2a7977c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,5 +15,6 @@
 
 include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples',
         'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
-        'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender',
-        'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
+        'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
+        'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
+        'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 819bebd..65b1da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -173,6 +173,31 @@ public class StreamsConfig extends AbstractConfig {
     public static final String UPGRADE_FROM_0100 = "0.10.0";
 
     /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
+     */
+    public static final String UPGRADE_FROM_0101 = "0.10.1";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
+     */
+    public static final String UPGRADE_FROM_0102 = "0.10.2";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
+     */
+    public static final String UPGRADE_FROM_0110 = "0.11.0";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
+     */
+    public static final String UPGRADE_FROM_10 = "1.0";
+
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
+     */
+    public static final String UPGRADE_FROM_11 = "1.1";
+
+    /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
      */
     public static final String AT_LEAST_ONCE = "at_least_once";
@@ -347,8 +372,9 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code upgrade.from} */
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
-    public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
-        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
+        "When upgrading from 1.2 to a newer version it is not required to specify this config." +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";
 
     /**
      * {@code value.serde}
@@ -364,7 +390,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * {@code zookeeper.connect}
-     * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
+     * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored.
      */
     @Deprecated
     public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
@@ -575,7 +601,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(UPGRADE_FROM_CONFIG,
                     ConfigDef.Type.STRING,
                     null,
-                    in(null, UPGRADE_FROM_0100),
+                    in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11),
                     Importance.LOW,
                     UPGRADE_FROM_DOC)
             .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 97771e5..c81105e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -199,10 +199,24 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
-        final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
-        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
-            log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
-            userMetadataVersion = 1;
+        final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (upgradeFrom != null) {
+            switch (upgradeFrom) {
+                case StreamsConfig.UPGRADE_FROM_0100:
+                    log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                    userMetadataVersion = 1;
+                    break;
+                case StreamsConfig.UPGRADE_FROM_0101:
+                case StreamsConfig.UPGRADE_FROM_0102:
+                case StreamsConfig.UPGRADE_FROM_0110:
+                case StreamsConfig.UPGRADE_FROM_10:
+                case StreamsConfig.UPGRADE_FROM_11:
+                    log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                    userMetadataVersion = 2;
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
+            }
         }
 
         final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
@@ -512,7 +526,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
-        if (minUserMetadataVersion == 2) {
+        if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
             for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
                 final HostInfo hostInfo = entry.getValue().hostInfo;
 
@@ -631,6 +645,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
+            case 3:
+                processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + usedVersion
                     + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
@@ -684,6 +702,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         }
     }
 
+    private void processVersionThreeAssignment(final AssignmentInfo info,
+                                               final List<TopicPartition> partitions,
+                                               final Map<TaskId, Set<TopicPartition>> activeTasks,
+                                               final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+        processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+    }
+
     /**
      * Internal helper function that creates a Kafka topic
      *
@@ -818,4 +843,5 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c8df749..3c5cee2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.HostInfo;
@@ -30,6 +30,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,15 +41,20 @@ public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int UNKNOWN = -1;
 
     private final int usedVersion;
+    private final int latestSupportedVersion;
     private List<TaskId> activeTasks;
     private Map<TaskId, Set<TopicPartition>> standbyTasks;
     private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-    private AssignmentInfo(final int version) {
+    // used for decoding; don't apply version checks
+    private AssignmentInfo(final int version,
+                           final int latestSupportedVersion) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
     }
 
     public AssignmentInfo(final List<TaskId> activeTasks,
@@ -57,11 +63,33 @@ public class AssignmentInfo {
         this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
     }
 
+    public AssignmentInfo() {
+        this(LATEST_SUPPORTED_VERSION,
+            Collections.<TaskId>emptyList(),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+            Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+    }
+
     public AssignmentInfo(final int version,
                           final List<TaskId> activeTasks,
                           final Map<TaskId, Set<TopicPartition>> standbyTasks,
                           final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
+
+        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+            throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+                + "; was: " + version);
+        }
+    }
+
+    // for testing only; don't apply version checks
+    AssignmentInfo(final int version,
+                   final int latestSupportedVersion,
+                   final List<TaskId> activeTasks,
+                   final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                   final Map<HostInfo, Set<TopicPartition>> hostState) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
@@ -71,6 +99,10 @@ public class AssignmentInfo {
         return usedVersion;
     }
 
+    public int latestSupportedVersion() {
+        return latestSupportedVersion;
+    }
+
     public List<TaskId> activeTasks() {
         return activeTasks;
     }
@@ -98,6 +130,9 @@ public class AssignmentInfo {
                 case 2:
                     encodeVersionTwo(out);
                     break;
+                case 3:
+                    encodeVersionThree(out);
+                    break;
                 default:
                     throw new IllegalStateException("Unknown metadata version: " + usedVersion
                         + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@@ -161,6 +196,13 @@ public class AssignmentInfo {
         }
     }
 
+    private void encodeVersionThree(final DataOutputStream out) throws IOException {
+        out.writeInt(3);
+        out.writeInt(LATEST_SUPPORTED_VERSION);
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
      */
@@ -169,19 +211,25 @@ public class AssignmentInfo {
         data.rewind();
 
         try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
-            // decode used version
-            final int usedVersion = in.readInt();
-            final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);
+            final AssignmentInfo assignmentInfo;
 
+            final int usedVersion = in.readInt();
             switch (usedVersion) {
                 case 1:
+                    assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
                     decodeVersionOneData(assignmentInfo, in);
                     break;
                 case 2:
+                    assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
                     decodeVersionTwoData(assignmentInfo, in);
                     break;
+                case 3:
+                    final int latestSupportedVersion = in.readInt();
+                    assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+                    decodeVersionThreeData(assignmentInfo, in);
+                    break;
                 default:
-                    TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+                    TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
                         "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
                     log.error(fatalException.getMessage(), fatalException);
                     throw fatalException;
@@ -195,15 +243,23 @@ public class AssignmentInfo {
 
     private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
                                              final DataInputStream in) throws IOException {
-        // decode active tasks
-        int count = in.readInt();
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        assignmentInfo.partitionsByHost = new HashMap<>();
+    }
+
+    private static void decodeActiveTasks(final AssignmentInfo assignmentInfo,
+                                          final DataInputStream in) throws IOException {
+        final int count = in.readInt();
         assignmentInfo.activeTasks = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
             assignmentInfo.activeTasks.add(TaskId.readFrom(in));
         }
+    }
 
-        // decode standby tasks
-        count = in.readInt();
+    private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo,
+                                           final DataInputStream in) throws IOException {
+        final int count = in.readInt();
         assignmentInfo.standbyTasks = new HashMap<>(count);
         for (int i = 0; i < count; i++) {
             TaskId id = TaskId.readFrom(in);
@@ -213,9 +269,13 @@ public class AssignmentInfo {
 
     private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
                                              final DataInputStream in) throws IOException {
-        decodeVersionOneData(assignmentInfo, in);
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        decodeGlobalAssignmentData(assignmentInfo, in);
+    }
 
-        // decode partitions by host
+    private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo,
+                                                   final DataInputStream in) throws IOException {
         assignmentInfo.partitionsByHost = new HashMap<>();
         final int numEntries = in.readInt();
         for (int i = 0; i < numEntries; i++) {
@@ -233,19 +293,27 @@ public class AssignmentInfo {
         return partitions;
     }
 
+    private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo,
+                                               final DataInputStream in) throws IOException {
+        decodeActiveTasks(assignmentInfo, in);
+        decodeStandbyTasks(assignmentInfo, in);
+        decodeGlobalAssignmentData(assignmentInfo, in);
+    }
+
     @Override
     public int hashCode() {
-        return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+        return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
     }
 
     @Override
     public boolean equals(final Object o) {
         if (o instanceof AssignmentInfo) {
             final AssignmentInfo other = (AssignmentInfo) o;
-            return this.usedVersion == other.usedVersion &&
-                    this.activeTasks.equals(other.activeTasks) &&
-                    this.standbyTasks.equals(other.standbyTasks) &&
-                    this.partitionsByHost.equals(other.partitionsByHost);
+            return usedVersion == other.usedVersion &&
+                    latestSupportedVersion == other.latestSupportedVersion &&
+                    activeTasks.equals(other.activeTasks) &&
+                    standbyTasks.equals(other.standbyTasks) &&
+                    partitionsByHost.equals(other.partitionsByHost);
         } else {
             return false;
         }
@@ -253,7 +321,11 @@ public class AssignmentInfo {
 
     @Override
     public String toString() {
-        return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
+        return "[version=" + usedVersion
+            + ", supported version=" + latestSupportedVersion
+            + ", active tasks=" + activeTasks
+            + ", standby tasks=" + standbyTasks
+            + ", global assignment=" + partitionsByHost + "]";
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 7fee90b..be70947 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -31,16 +32,21 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    public static final int LATEST_SUPPORTED_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 3;
+    public static final int UNKNOWN = -1;
 
     private final int usedVersion;
+    private final int latestSupportedVersion;
     private UUID processId;
     private Set<TaskId> prevTasks;
     private Set<TaskId> standbyTasks;
     private String userEndPoint;
 
-    private SubscriptionInfo(final int version) {
+    // used for decoding; don't apply version checks
+    private SubscriptionInfo(final int version,
+                             final int latestSupportedVersion) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
     }
 
     public SubscriptionInfo(final UUID processId,
@@ -55,7 +61,23 @@ public class SubscriptionInfo {
                             final Set<TaskId> prevTasks,
                             final Set<TaskId> standbyTasks,
                             final String userEndPoint) {
+        this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+
+        if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+            throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+                + "; was: " + version);
+        }
+    }
+
+    // for testing only; don't apply version checks
+    protected SubscriptionInfo(final int version,
+                               final int latestSupportedVersion,
+                               final UUID processId,
+                               final Set<TaskId> prevTasks,
+                               final Set<TaskId> standbyTasks,
+                               final String userEndPoint) {
         this.usedVersion = version;
+        this.latestSupportedVersion = latestSupportedVersion;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
@@ -66,6 +88,10 @@ public class SubscriptionInfo {
         return usedVersion;
     }
 
+    public int latestSupportedVersion() {
+        return latestSupportedVersion;
+    }
+
     public UUID processId() {
         return processId;
     }
@@ -93,7 +119,10 @@ public class SubscriptionInfo {
                 buf = encodeVersionOne();
                 break;
             case 2:
-                buf = encodeVersionTwo(prepareUserEndPoint());
+                buf = encodeVersionTwo();
+                break;
+            case 3:
+                buf = encodeVersionThree();
                 break;
             default:
                 throw new IllegalStateException("Unknown metadata version: " + usedVersion
@@ -108,7 +137,9 @@ public class SubscriptionInfo {
         final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
 
         buf.putInt(1); // version
-        encodeVersionOneData(buf);
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
 
         return buf;
     }
@@ -120,18 +151,15 @@ public class SubscriptionInfo {
                4 + standbyTasks.size() * 8; // length + standby tasks
     }
 
-    private void encodeVersionOneData(final ByteBuffer buf) {
-        // encode client UUID
+    private void encodeClientUUID(final ByteBuffer buf) {
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
-        // encode ids of previously running tasks
-        buf.putInt(prevTasks.size());
-        for (TaskId id : prevTasks) {
-            id.writeTo(buf);
-        }
-        // encode ids of cached tasks
-        buf.putInt(standbyTasks.size());
-        for (TaskId id : standbyTasks) {
+    }
+
+    private void encodeTasks(final ByteBuffer buf,
+                             final Collection<TaskId> taskIds) {
+        buf.putInt(taskIds.size());
+        for (TaskId id : taskIds) {
             id.writeTo(buf);
         }
     }
@@ -144,52 +172,87 @@ public class SubscriptionInfo {
         }
     }
 
-    private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+    private ByteBuffer encodeVersionTwo() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
         final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
 
         buf.putInt(2); // version
-        encodeVersionTwoData(buf, endPointBytes);
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
 
         return buf;
     }
 
     private int getVersionTwoByteLength(final byte[] endPointBytes) {
-        return getVersionOneByteLength() +
+        return 4 + // version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8 + // length + standby tasks
                4 + endPointBytes.length; // length + userEndPoint
     }
 
-    private void encodeVersionTwoData(final ByteBuffer buf,
-                                      final byte[] endPointBytes) {
-        encodeVersionOneData(buf);
+    private void encodeUserEndPoint(final ByteBuffer buf,
+                                    final byte[] endPointBytes) {
         if (endPointBytes != null) {
             buf.putInt(endPointBytes.length);
             buf.put(endPointBytes);
         }
     }
 
+    private ByteBuffer encodeVersionThree() {
+        final byte[] endPointBytes = prepareUserEndPoint();
+
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+
+        buf.putInt(3); // used version
+        buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+        encodeClientUUID(buf);
+        encodeTasks(buf, prevTasks);
+        encodeTasks(buf, standbyTasks);
+        encodeUserEndPoint(buf, endPointBytes);
+
+        return buf;
+    }
+
+    private int getVersionThreeByteLength(final byte[] endPointBytes) {
+        return 4 + // used version
+               4 + // latest supported version version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8 + // length + standby tasks
+               4 + endPointBytes.length; // length + userEndPoint
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data
      */
     public static SubscriptionInfo decode(final ByteBuffer data) {
+        final SubscriptionInfo subscriptionInfo;
+
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        // decode used version
         final int usedVersion = data.getInt();
-        final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion);
-
         switch (usedVersion) {
             case 1:
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
                 decodeVersionOneData(subscriptionInfo, data);
                 break;
             case 2:
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
                 decodeVersionTwoData(subscriptionInfo, data);
                 break;
+            case 3:
+                final int latestSupportedVersion = data.getInt();
+                subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
+                decodeVersionThreeData(subscriptionInfo, data);
+                break;
             default:
-                TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
-                    "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
-                log.error(fatalException.getMessage(), fatalException);
-                throw fatalException;
+                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
+                log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
         }
 
         return subscriptionInfo;
@@ -197,30 +260,43 @@ public class SubscriptionInfo {
 
     private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
                                              final ByteBuffer data) {
-        // decode client UUID
-        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+        decodeClientUUID(subscriptionInfo, data);
 
-        // decode previously active tasks
-        final int numPrevs = data.getInt();
         subscriptionInfo.prevTasks = new HashSet<>();
-        for (int i = 0; i < numPrevs; i++) {
-            TaskId id = TaskId.readFrom(data);
-            subscriptionInfo.prevTasks.add(id);
-        }
+        decodeTasks(subscriptionInfo.prevTasks, data);
 
-        // decode previously cached tasks
-        final int numCached = data.getInt();
         subscriptionInfo.standbyTasks = new HashSet<>();
-        for (int i = 0; i < numCached; i++) {
-            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+    }
+
+    private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo,
+                                         final ByteBuffer data) {
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+    }
+
+    private static void decodeTasks(final Collection<TaskId> taskIds,
+                                    final ByteBuffer data) {
+        final int numPrevs = data.getInt();
+        for (int i = 0; i < numPrevs; i++) {
+            taskIds.add(TaskId.readFrom(data));
         }
     }
 
     private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
                                              final ByteBuffer data) {
-        decodeVersionOneData(subscriptionInfo, data);
+        decodeClientUUID(subscriptionInfo, data);
+
+        subscriptionInfo.prevTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.prevTasks, data);
 
-        // decode user end point (can be null)
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+
+        decodeUserEndPoint(subscriptionInfo, data);
+    }
+
+    private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo,
+                                           final ByteBuffer data) {
         int bytesLength = data.getInt();
         if (bytesLength != 0) {
             final byte[] bytes = new byte[bytesLength];
@@ -229,9 +305,21 @@ public class SubscriptionInfo {
         }
     }
 
-    @Override
+    private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo,
+                                               final ByteBuffer data) {
+        decodeClientUUID(subscriptionInfo, data);
+
+        subscriptionInfo.prevTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.prevTasks, data);
+
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        decodeTasks(subscriptionInfo.standbyTasks, data);
+
+        decodeUserEndPoint(subscriptionInfo, data);
+    }
+
     public int hashCode() {
-        final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
             return hashCode;
         }
@@ -243,6 +331,7 @@ public class SubscriptionInfo {
         if (o instanceof SubscriptionInfo) {
             final SubscriptionInfo other = (SubscriptionInfo) o;
             return this.usedVersion == other.usedVersion &&
+                    this.latestSupportedVersion == other.latestSupportedVersion &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
@@ -252,4 +341,13 @@ public class SubscriptionInfo {
         }
     }
 
+    @Override
+    public String toString() {
+        return "[version=" + usedVersion
+            + ", supported version=" + latestSupportedVersion
+            + ", process ID=" + processId
+            + ", prev tasks=" + prevTasks
+            + ", standby tasks=" + standbyTasks
+            + ", user endpoint=" + userEndPoint + "]";
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index e9ed968..4e04b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -46,7 +46,6 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -64,6 +63,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class StreamsPartitionAssignorTest {
 
@@ -867,9 +867,12 @@ public class StreamsPartitionAssignorTest {
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
         final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
-        assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+        assertEquals(
+            Utils.mkSet(
+                new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
-                new TopicPartition("topic1", 2)), topicPartitions);
+                new TopicPartition("topic1", 2)),
+            topicPartitions);
     }
 
     @Test
@@ -881,7 +884,7 @@ public class StreamsPartitionAssignorTest {
 
         try {
             configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
-            Assert.fail("expected to an exception due to invalid config");
+            fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
         }
@@ -893,7 +896,7 @@ public class StreamsPartitionAssignorTest {
 
         try {
             configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
-            Assert.fail("expected to an exception due to invalid config");
+            fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
         }
@@ -1088,21 +1091,36 @@ public class StreamsPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2);
+    }
+
+    @Test
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3);
+    }
+
+    @Test
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
+        shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3);
+    }
+
+    private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion,
+                                                                                     final int otherVersion) {
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put(
             "consumer1",
             new PartitionAssignor.Subscription(
                 Collections.singletonList("topic1"),
-                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+                new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
             )
         );
         subscriptions.put(
             "consumer2",
             new PartitionAssignor.Subscription(
                 Collections.singletonList("topic1"),
-                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+                new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
             )
         );
 
@@ -1115,12 +1133,12 @@ public class StreamsPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
 
         assertThat(assignment.size(), equalTo(2));
-        assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1));
-        assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
+        assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion));
+        assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion));
     }
 
     @Test
-    public void shouldDownGradeSubscription() {
+    public void shouldDownGradeSubscriptionToVersion1() {
         final Set<TaskId> emptyTasks = Collections.emptySet();
 
         mockTaskManager(
@@ -1135,6 +1153,46 @@ public class StreamsPartitionAssignorTest {
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
     }
 
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0101() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0102() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For0110() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For10() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10);
+    }
+
+    @Test
+    public void shouldDownGradeSubscriptionToVersion2For11() {
+        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11);
+    }
+
+    private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) {
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+
+        mockTaskManager(
+            emptyTasks,
+            emptyTasks,
+            UUID.randomUUID(),
+            builder);
+        configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue));
+
+        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, Set<TopicPartition>>emptyMap(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index c1020a9..c7382e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -22,85 +22,70 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.HostInfo;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class AssignmentInfoTest {
+    private final List<TaskId> activeTasks = Arrays.asList(
+        new TaskId(0, 0),
+        new TaskId(0, 0),
+        new TaskId(0, 1), new TaskId(1, 0));
+    private final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>() {
+        {
+            put(new TaskId(1, 1),
+                Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+            put(new TaskId(2, 0),
+                Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+        }
+    };
+    private final Map<HostInfo, Set<TopicPartition>> globalAssignment = new HashMap<HostInfo, Set<TopicPartition>>() {
+        {
+            put(new HostInfo("localhost", 80),
+                Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t3", 3)));
+        }
+    };
 
     @Test
-    public void testEncodeDecode() {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+    public void shouldUseLatestSupportedVersionByDefault() {
+        final AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, globalAssignment);
+        assertEquals(AssignmentInfo.LATEST_SUPPORTED_VERSION, info.version());
+    }
 
-        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
-        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion1() {
+        new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment);
+    }
 
-        assertEquals(info, decoded);
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion2() {
+        new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment);
     }
 
     @Test
-    public void shouldDecodePreviousVersion() throws IOException {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
-        final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
-        final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
-        assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
-        assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
-        assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1
-        assertEquals(1, decoded.version());
+    public void shouldEncodeAndDecodeVersion1() {
+        final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
-    /**
-     * This is a clone of what the V1 encoding did. The encode method has changed for V2
-     * so it is impossible to test compatibility without having this
-     */
-    private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-        // Encode version
-        out.writeInt(oldVersion.version());
-        // Encode active tasks
-        out.writeInt(oldVersion.activeTasks().size());
-        for (TaskId id : oldVersion.activeTasks()) {
-            id.writeTo(out);
-        }
-        // Encode standby tasks
-        out.writeInt(oldVersion.standbyTasks().size());
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) {
-            TaskId id = entry.getKey();
-            id.writeTo(out);
-
-            Set<TopicPartition> partitions = entry.getValue();
-            out.writeInt(partitions.size());
-            for (TopicPartition partition : partitions) {
-                out.writeUTF(partition.topic());
-                out.writeInt(partition.partition());
-            }
-        }
-
-        out.flush();
-        out.close();
-
-        return ByteBuffer.wrap(baos.toByteArray());
+    @Test
+    public void shouldEncodeAndDecodeVersion2() {
+        final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+    }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion3() {
+        final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment);
+        final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment);
+        assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
     }
 
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 633285a..e98b8ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -19,81 +19,60 @@ package org.apache.kafka.streams.processor.internals.assignment;
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 public class SubscriptionInfoTest {
+    private final UUID processId = UUID.randomUUID();
+    private final Set<TaskId> activeTasks = new HashSet<>(Arrays.asList(
+        new TaskId(0, 0),
+        new TaskId(0, 1),
+        new TaskId(1, 0)));
+    private final Set<TaskId> standbyTasks = new HashSet<>(Arrays.asList(
+        new TaskId(1, 1),
+        new TaskId(2, 0)));
 
-    @Test
-    public void testEncodeDecode() {
-        UUID processId = UUID.randomUUID();
+    private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
 
-        Set<TaskId> activeTasks =
-                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+    @Test
+    public void shouldUseLatestSupportedVersionByDefault() {
+        final SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION, info.version());
+    }
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null);
-        SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion1() {
+        new SubscriptionInfo(0, processId, activeTasks, standbyTasks, "localhost:80");
+    }
 
-        assertEquals(info, decoded);
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowForUnknownVersion2() {
+        new SubscriptionInfo(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, processId, activeTasks, standbyTasks, "localhost:80");
     }
 
     @Test
-    public void shouldEncodeDecodeWithUserEndPoint() {
-        SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
-                Collections.singleton(new TaskId(0, 0)), Collections.<TaskId>emptySet(), "localhost:80");
-        SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
-        assertEquals(original, decoded);
+    public void shouldEncodeAndDecodeVersion1() {
+        final SubscriptionInfo info = new SubscriptionInfo(1, processId, activeTasks, standbyTasks, IGNORED_USER_ENDPOINT);
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(1, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, null);
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
     @Test
-    public void shouldBeBackwardCompatible() {
-        UUID processId = UUID.randomUUID();
-
-        Set<TaskId> activeTasks =
-                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
-        Set<TaskId> standbyTasks =
-                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
-
-        final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
-        final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
-        assertEquals(activeTasks, decode.prevTasks());
-        assertEquals(standbyTasks, decode.standbyTasks());
-        assertEquals(processId, decode.processId());
-        assertNull(decode.userEndPoint());
+    public void shouldEncodeAndDecodeVersion2() {
+        final SubscriptionInfo info = new SubscriptionInfo(2, processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(2, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
-    /**
-     * This is a clone of what the V1 encoding did. The encode method has changed for V2
-     * so it is impossible to test compatibility without having this
-     */
-    private ByteBuffer encodePreviousVersion(UUID processId,  Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
-        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
-        // version
-        buf.putInt(1);
-        // encode client UUID
-        buf.putLong(processId.getMostSignificantBits());
-        buf.putLong(processId.getLeastSignificantBits());
-        // encode ids of previously running tasks
-        buf.putInt(prevTasks.size());
-        for (TaskId id : prevTasks) {
-            id.writeTo(buf);
-        }
-        // encode ids of cached tasks
-        buf.putInt(standbyTasks.size());
-        for (TaskId id : standbyTasks) {
-            id.writeTo(buf);
-        }
-        buf.rewind();
-
-        return buf;
+    @Test
+    public void shouldEncodeAndDecodeVersion3() {
+        final SubscriptionInfo info = new SubscriptionInfo(3, processId, activeTasks, standbyTasks, "localhost:80");
+        final SubscriptionInfo expectedInfo = new SubscriptionInfo(3, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80");
+        assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
+
 }
diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..a8796cb
--- /dev/null
+++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    /**
+     * This test cannot be executed, as long as Kafka 1.1.1 is not released
+     */
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 2) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] : ""));
+        }
+        final String kafka = args[0];
+        final String propFileName = args.length > 1 ? args[1] : null;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 796ca31..a4b902a 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -412,6 +412,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
                                                                  "org.apache.kafka.streams.tests.StreamsUpgradeTest",
                                                                  "")
         self.UPGRADE_FROM = None
+        self.UPGRADE_TO = None
 
     def set_version(self, kafka_streams_version):
         self.KAFKA_STREAMS_VERSION = kafka_streams_version
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index fa79d57..8b7d771 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,8 +23,16 @@ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATES
 import random
 import time
 
+# broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)]
-simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)]
+
+metadata_1_versions = [str(LATEST_0_10_0)]
+metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+# we can add the following versions to `backward_compatible_metadata_2_versions` after the corresponding
+# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available:
+# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)
+backward_compatible_metadata_2_versions = []
+metadata_3_versions = [str(DEV_VERSION)]
 
 class StreamsUpgradeTest(Test):
     """
@@ -39,6 +47,7 @@ class StreamsUpgradeTest(Test):
             'echo' : { 'partitions': 5 },
             'data' : { 'partitions': 5 },
         }
+        self.leader = None
 
     def perform_broker_upgrade(self, to_version):
         self.logger.info("First pass bounce - rolling broker upgrade")
@@ -114,7 +123,7 @@ class StreamsUpgradeTest(Test):
         node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
         self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
 
-    @matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2)
+    @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
     def test_simple_upgrade_downgrade(self, from_version, to_version):
         """
         Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
@@ -165,15 +174,12 @@ class StreamsUpgradeTest(Test):
 
         self.driver.stop()
 
-    #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
-    #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released
-    #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released
-    #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until Kafka 1.0.2 is released
-    #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until Kafka 1.1.1 is released
-    @parametrize(new_version=str(DEV_VERSION))
-    def test_metadata_upgrade(self, new_version):
+    #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
+    @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions)
+    @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions)
+    def test_metadata_upgrade(self, from_version, to_version):
         """
-        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
+        Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
         """
 
         self.zk = ZookeeperService(self.test_context, num_nodes=1)
@@ -189,7 +195,7 @@ class StreamsUpgradeTest(Test):
         self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
 
         self.driver.start()
-        self.start_all_nodes_with(str(LATEST_0_10_0))
+        self.start_all_nodes_with(from_version)
 
         self.processors = [self.processor1, self.processor2, self.processor3]
 
@@ -200,13 +206,13 @@ class StreamsUpgradeTest(Test):
         random.shuffle(self.processors)
         for p in self.processors:
             p.CLEAN_NODE_ENABLED = False
-            self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+            self.do_rolling_bounce(p, from_version[:-2], to_version, counter)
             counter = counter + 1
 
         # second rolling bounce
         random.shuffle(self.processors)
         for p in self.processors:
-            self.do_rolling_bounce(p, None, new_version, counter)
+            self.do_rolling_bounce(p, None, to_version, counter)
             counter = counter + 1
 
         # shutdown
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 66e5fcf..7823efa 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -63,17 +63,17 @@ def get_version(node=None):
 DEV_BRANCH = KafkaVersion("dev")
 DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT")
 
-# 0.8.2.X versions
+# 0.8.2.x versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
 V_0_8_2_2 = KafkaVersion("0.8.2.2")
 LATEST_0_8_2 = V_0_8_2_2
 
-# 0.9.0.X versions
+# 0.9.0.x versions
 V_0_9_0_0 = KafkaVersion("0.9.0.0")
 V_0_9_0_1 = KafkaVersion("0.9.0.1")
 LATEST_0_9 = V_0_9_0_1
 
-# 0.10.0.X versions
+# 0.10.0.x versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
 V_0_10_0_1 = KafkaVersion("0.10.0.1")
 LATEST_0_10_0 = V_0_10_0_1

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.