You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/05 18:56:49 UTC

[kafka] branch trunk updated: KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2a4ba75  KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)
2a4ba75 is described below

commit 2a4ba75e1338eaa97da87077330b43d7448a18bc
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Mar 5 10:56:42 2018 -0800

    KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)
    
    Author: Matthias J. Sax <ma...@confluent.io>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |   4 +-
 .../internals/StreamsPartitionAssignor.java        | 230 ++++++++++++--------
 .../internals/assignment/AssignmentInfo.java       | 236 +++++++++++++--------
 .../internals/assignment/SubscriptionInfo.java     | 224 +++++++++++++------
 .../integration/QueryableStateIntegrationTest.java |   1 -
 .../internals/StreamsPartitionAssignorTest.java    |  95 ++++++---
 .../internals/assignment/AssignmentInfoTest.java   |  19 +-
 .../internals/assignment/SubscriptionInfoTest.java |  10 +-
 8 files changed, 530 insertions(+), 289 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6b36261..47becfc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -226,11 +226,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
     private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
-    /** {@code default timestamp.extractor} */
+    /** {@code default.timestamp.extractor} */
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
 
-    /** {@code default value.serde} */
+    /** {@code default.value.serde} */
     public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
     private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9aa0e94..71a84b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -66,7 +66,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
+        AssignedPartition(final TaskId taskId,
+                          final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
@@ -77,11 +78,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         @Override
-        public boolean equals(Object o) {
+        public boolean equals(final Object o) {
             if (!(o instanceof AssignedPartition)) {
                 return false;
             }
-            AssignedPartition other = (AssignedPartition) o;
+            final AssignedPartition other = (AssignedPartition) o;
             return compareTo(other) == 0;
         }
 
@@ -104,8 +105,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 final String host = getHost(endPoint);
                 final Integer port = getPort(endPoint);
 
-                if (host == null || port == null)
+                if (host == null || port == null) {
                     throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+                }
 
                 hostInfo = new HostInfo(host, port);
             } else {
@@ -119,10 +121,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             state = new ClientState();
         }
 
-        void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+        void addConsumer(final String consumerMemberId,
+                         final SubscriptionInfo info) {
             consumers.add(consumerMemberId);
-            state.addPreviousActiveTasks(info.prevTasks);
-            state.addPreviousStandbyTasks(info.standbyTasks);
+            state.addPreviousActiveTasks(info.prevTasks());
+            state.addPreviousStandbyTasks(info.standbyTasks());
             state.incrementCapacity();
         }
 
@@ -157,8 +160,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
     private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
         @Override
-        public int compare(TopicPartition p1, TopicPartition p2) {
-            int result = p1.topic().compareTo(p2.topic());
+        public int compare(final TopicPartition p1,
+                           final TopicPartition p2) {
+            final int result = p1.topic().compareTo(p2.topic());
 
             if (result != 0) {
                 return result;
@@ -194,15 +198,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
         if (o == null) {
-            KafkaException ex = new KafkaException("TaskManager is not specified");
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            final KafkaException fatalException = new KafkaException("TaskManager is not specified");
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
         }
 
         if (!(o instanceof TaskManager)) {
-            KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
-            log.error(ex.getMessage(), ex);
-            throw ex;
+            final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
         }
 
         taskManager = (TaskManager) o;
@@ -214,14 +218,14 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
             try {
-                String host = getHost(userEndPoint);
-                Integer port = getPort(userEndPoint);
+                final String host = getHost(userEndPoint);
+                final Integer port = getPort(userEndPoint);
 
                 if (host == null || port == null)
                     throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
                                     " but received %s",
                             logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
-            } catch (NumberFormatException nfe) {
+            } catch (final NumberFormatException nfe) {
                 throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
                         logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
             }
@@ -240,7 +244,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     }
 
     @Override
-    public Subscription subscription(Set<String> topics) {
+    public Subscription subscription(final Set<String> topics) {
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Task ids of previously running tasks
@@ -249,7 +253,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
         final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
         standbyTasks.removeAll(previousActiveTasks);
-        final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
+        final SubscriptionInfo data = new SubscriptionInfo(
+            taskManager.processId(),
+            previousActiveTasks,
+            standbyTasks,
+            this.userEndPoint);
 
         taskManager.updateSubscriptionsFromMetadata(topics);
 
@@ -277,22 +285,32 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      * 3. within each client, tasks are assigned to consumer clients in round-robin manner.
      */
     @Override
-    public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+    public Map<String, Assignment> assign(final Cluster metadata,
+                                          final Map<String, Subscription> subscriptions) {
         // construct the client metadata from the decoded subscription info
-        Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
-
-        for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
-            String consumerId = entry.getKey();
-            Subscription subscription = entry.getValue();
-
-            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+        final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
+        int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+            final String consumerId = entry.getKey();
+            final Subscription subscription = entry.getValue();
+
+            final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version();
+            if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+                throw new IllegalStateException("Unknown metadata version: " + usedVersion
+                    + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+            }
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
 
             // create the new client metadata if necessary
-            ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+            ClientMetadata clientMetadata = clientsMetadata.get(info.processId());
 
             if (clientMetadata == null) {
-                clientMetadata = new ClientMetadata(info.userEndPoint);
-                clientsMetadata.put(info.processId, clientMetadata);
+                clientMetadata = new ClientMetadata(info.userEndPoint());
+                clientsMetadata.put(info.processId(), clientMetadata);
             }
 
             // add the consumer to the client
@@ -309,8 +327,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
 
         final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
-        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
-            for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+        for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
                 repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
             }
         }
@@ -319,13 +337,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         do {
             numPartitionsNeeded = false;
 
-            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
-                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+            for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+                for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                     int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
 
                     // try set the number of partitions for this repartition topic if it is not set yet
                     if (numPartitions == UNKNOWN) {
-                        for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+                        for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                             final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
 
                             if (otherSinkTopics.contains(topicName)) {
@@ -375,7 +393,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         // augment the metadata with the newly computed number of partitions for all the
         // repartition source topics
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
-        for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+        for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
             final String topic = entry.getKey();
             final int numPartitions = entry.getValue().numPartitions;
 
@@ -395,7 +413,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         // get the tasks as partition groups from the partition grouper
         final Set<String> allSourceTopics = new HashSet<>();
         final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
-        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             allSourceTopics.addAll(entry.getValue().sourceTopics);
             sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
         }
@@ -405,9 +423,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         // check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
         final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
         final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
             final Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
+            for (final TopicPartition partition : partitions) {
                 if (allAssignedPartitions.contains(partition)) {
                     log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
                 }
@@ -422,10 +440,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             }
             ids.add(id);
         }
-        for (String topic : allSourceTopics) {
+        for (final String topic : allSourceTopics) {
             final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
             if (!partitionInfoList.isEmpty()) {
-                for (PartitionInfo partitionInfo : partitionInfoList) {
+                for (final PartitionInfo partitionInfo : partitionInfoList) {
                     final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
                         log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
@@ -438,15 +456,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         // add tasks to state change log topic subscribers
         final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
-        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             final int topicGroupId = entry.getKey();
             final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
 
-            for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+            for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
                 // the expected number of partitions is the max value of TaskId.partition + 1
                 int numPartitions = UNKNOWN;
                 if (tasksByTopicGroup.get(topicGroupId) != null) {
-                    for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+                    for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
                         if (numPartitions < task.partition + 1)
                             numPartitions = task.partition + 1;
                     }
@@ -468,7 +486,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         // assign tasks to clients
         final Map<UUID, ClientState> states = new HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+        for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
             states.put(entry.getKey(), entry.getValue().state);
         }
 
@@ -484,25 +502,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
-            final HostInfo hostInfo = entry.getValue().hostInfo;
+        if (minUserMetadataVersion == 2) {
+            for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+                final HostInfo hostInfo = entry.getValue().hostInfo;
 
-            if (hostInfo != null) {
-                final Set<TopicPartition> topicPartitions = new HashSet<>();
-                final ClientState state = entry.getValue().state;
+                if (hostInfo != null) {
+                    final Set<TopicPartition> topicPartitions = new HashSet<>();
+                    final ClientState state = entry.getValue().state;
 
-                for (final TaskId id : state.activeTasks()) {
-                    topicPartitions.addAll(partitionsForTask.get(id));
-                }
+                    for (final TaskId id : state.activeTasks()) {
+                        topicPartitions.addAll(partitionsForTask.get(id));
+                    }
 
-                partitionsByHostState.put(hostInfo, topicPartitions);
+                    partitionsByHostState.put(hostInfo, topicPartitions);
+                }
             }
         }
         taskManager.setPartitionsByHostState(partitionsByHostState);
 
         // within the client, distribute tasks to its owned consumers
         final Map<String, Assignment> assignment = new HashMap<>();
-        for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+        for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
             final Set<String> consumers = entry.getValue().consumers;
             final ClientState state = entry.getValue().state;
 
@@ -511,7 +531,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
             int consumerTaskIndex = 0;
 
-            for (String consumer : consumers) {
+            for (final String consumer : consumers) {
                 final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
                 final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
 
@@ -540,13 +560,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 Collections.sort(assignedPartitions);
                 final List<TaskId> active = new ArrayList<>();
                 final List<TopicPartition> activePartitions = new ArrayList<>();
-                for (AssignedPartition partition : assignedPartitions) {
+                for (final AssignedPartition partition : assignedPartitions) {
                     active.add(partition.taskId);
                     activePartitions.add(partition.partition);
                 }
 
                 // finally, encode the assignment before sending back to coordinator
-                assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
+                assignment.put(consumer, new Assignment(
+                    activePartitions,
+                    new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
             }
         }
 
@@ -577,26 +599,54 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
      * @throws TaskAssignmentException if there is no task id for one of the partitions specified
      */
     @Override
-    public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
+    public void onAssignment(final Assignment assignment) {
+        final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        final int usedVersion = info.version();
 
-        Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        // version 1 field
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        // version 2 fields
+        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+        switch (usedVersion) {
+            case 1:
+                processVersionOneAssignment(info, partitions, activeTasks);
+                partitionsByHost = Collections.emptyMap();
+                break;
+            case 2:
+                processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+                partitionsByHost = info.partitionsByHost();
+                break;
+            default:
+                throw new IllegalStateException("Unknown metadata version: " + usedVersion
+                    + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
+        }
 
+        taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
+        taskManager.setPartitionsByHostState(partitionsByHost);
+        taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
+        taskManager.updateSubscriptionsFromAssignment(partitions);
+    }
+
+    private void processVersionOneAssignment(final AssignmentInfo info,
+                                             final List<TopicPartition> partitions,
+                                             final Map<TaskId, Set<TopicPartition>> activeTasks) {
         // the number of assigned partitions should be the same as number of active tasks, which
         // could be duplicated if one task has more than one assigned partitions
-        if (partitions.size() != info.activeTasks.size()) {
+        if (partitions.size() != info.activeTasks().size()) {
             throw new TaskAssignmentException(
-                    String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
-                            ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
+                String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
+                    ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString())
             );
         }
 
         for (int i = 0; i < partitions.size(); i++) {
-            TopicPartition partition = partitions.get(i);
-            TaskId id = info.activeTasks.get(i);
+            final TopicPartition partition = partitions.get(i);
+            final TaskId id = info.activeTasks().get(i);
 
             Set<TopicPartition> assignedPartitions = activeTasks.get(id);
             if (assignedPartitions == null) {
@@ -605,23 +655,23 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             }
             assignedPartitions.add(partition);
         }
+    }
 
-        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
-        for (Set<TopicPartition> value : info.partitionsByHost.values()) {
-            for (TopicPartition topicPartition : value) {
-                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
-                        topicPartition.partition(),
-                        null,
-                        new Node[0],
-                        new Node[0]));
+    private void processVersionTwoAssignment(final AssignmentInfo info,
+                                             final List<TopicPartition> partitions,
+                                             final Map<TaskId, Set<TopicPartition>> activeTasks,
+                                             final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+        processVersionOneAssignment(info, partitions, activeTasks);
+
+        // process partitions by host
+        final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
+        for (final Set<TopicPartition> value : partitionsByHost.values()) {
+            for (final TopicPartition topicPartition : value) {
+                topicToPartitionInfo.put(
+                    topicPartition,
+                    new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
             }
         }
-
-        taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
-        taskManager.setPartitionsByHostState(info.partitionsByHost);
-        taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
-
-        taskManager.updateSubscriptionsFromAssignment(partitions);
     }
 
     /**
@@ -658,10 +708,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
         log.debug("Completed validating internal topics in partition assignor.");
     }
 
-    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
-                                      Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
-                                      Cluster metadata) {
-        for (Set<String> copartitionGroup : copartitionGroups) {
+    private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
+                                      final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+                                      final Cluster metadata) {
+        for (final Set<String> copartitionGroup : copartitionGroups) {
             copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
         }
     }
@@ -677,7 +727,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
 
         private final Set<String> updatedTopicSubscriptions = new HashSet<>();
 
-        public void updateTopics(Collection<String> topicNames) {
+        public void updateTopics(final Collection<String> topicNames) {
             updatedTopicSubscriptions.clear();
             updatedTopicSubscriptions.addAll(topicNames);
         }
@@ -735,7 +785,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
             // if all topics for this co-partition group is repartition topics,
             // then set the number of partitions to be the maximum of the number of partitions.
             if (numPartitions == UNKNOWN) {
-                for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+                for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
                     if (copartitionGroup.contains(entry.getKey())) {
                         final int partitions = entry.getValue().numPartitions;
                         if (partitions > numPartitions) {
@@ -745,7 +795,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
                 }
             }
             // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
-            for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+            for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
                 if (copartitionGroup.contains(entry.getKey())) {
                     entry.getValue().numPartitions = numPartitions;
                 }
@@ -755,7 +805,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
     }
 
     // following functions are for test only
-    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+    void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
         this.internalTopicManager = internalTopicManager;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 8607472..c8df749 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -39,76 +39,123 @@ import java.util.Set;
 public class AssignmentInfo {
 
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
-    /**
-     * A new field was added, partitionsByHost. CURRENT_VERSION
-     * is required so we can decode the previous version. For example, this may occur
-     * during a rolling upgrade
-     */
-    private static final int CURRENT_VERSION = 2;
-    public final int version;
-    public final List<TaskId> activeTasks; // each element corresponds to a partition
-    public final Map<TaskId, Set<TopicPartition>> standbyTasks;
-    public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-    public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
-                          Map<HostInfo, Set<TopicPartition>> hostState) {
-        this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
+    public static final int LATEST_SUPPORTED_VERSION = 2;
+
+    private final int usedVersion;
+    private List<TaskId> activeTasks;
+    private Map<TaskId, Set<TopicPartition>> standbyTasks;
+    private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+    private AssignmentInfo(final int version) {
+        this.usedVersion = version;
+    }
+
+    public AssignmentInfo(final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
-                             Map<HostInfo, Set<TopicPartition>> hostState) {
-        this.version = version;
+    public AssignmentInfo(final int version,
+                          final List<TaskId> activeTasks,
+                          final Map<TaskId, Set<TopicPartition>> standbyTasks,
+                          final Map<HostInfo, Set<TopicPartition>> hostState) {
+        this.usedVersion = version;
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.partitionsByHost = hostState;
     }
 
+    public int version() {
+        return usedVersion;
+    }
+
+    public List<TaskId> activeTasks() {
+        return activeTasks;
+    }
+
+    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public Map<HostInfo, Set<TopicPartition>> partitionsByHost() {
+        return partitionsByHost;
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
      * IO exception during encoding
      */
     public ByteBuffer encode() {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(baos);
-
-        try {
-            // Encode version
-            out.writeInt(version);
-            // Encode active tasks
-            out.writeInt(activeTasks.size());
-            for (TaskId id : activeTasks) {
-                id.writeTo(out);
-            }
-            // Encode standby tasks
-            out.writeInt(standbyTasks.size());
-            for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
-                TaskId id = entry.getKey();
-                id.writeTo(out);
-
-                Set<TopicPartition> partitions = entry.getValue();
-                writeTopicPartitions(out, partitions);
-            }
-            out.writeInt(partitionsByHost.size());
-            for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
-                    .entrySet()) {
-                final HostInfo hostInfo = entry.getKey();
-                out.writeUTF(hostInfo.host());
-                out.writeInt(hostInfo.port());
-                writeTopicPartitions(out, entry.getValue());
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (final DataOutputStream out = new DataOutputStream(baos)) {
+            switch (usedVersion) {
+                case 1:
+                    encodeVersionOne(out);
+                    break;
+                case 2:
+                    encodeVersionTwo(out);
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown metadata version: " + usedVersion
+                        + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
             }
 
             out.flush();
             out.close();
 
             return ByteBuffer.wrap(baos.toByteArray());
-        } catch (IOException ex) {
+        } catch (final IOException ex) {
             throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
         }
     }
 
-    private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
+    private void encodeVersionOne(final DataOutputStream out) throws IOException {
+        out.writeInt(1); // version
+        encodeActiveAndStandbyTaskAssignment(out);
+    }
+
+    private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException {
+        // encode active tasks
+        out.writeInt(activeTasks.size());
+        for (final TaskId id : activeTasks) {
+            id.writeTo(out);
+        }
+
+        // encode standby tasks
+        out.writeInt(standbyTasks.size());
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+            final TaskId id = entry.getKey();
+            id.writeTo(out);
+
+            final Set<TopicPartition> partitions = entry.getValue();
+            writeTopicPartitions(out, partitions);
+        }
+    }
+
+    private void encodeVersionTwo(final DataOutputStream out) throws IOException {
+        out.writeInt(2); // version
+        encodeActiveAndStandbyTaskAssignment(out);
+        encodePartitionsByHost(out);
+    }
+
+    private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
+        // encode partitions by host
+        out.writeInt(partitionsByHost.size());
+        for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
+            final HostInfo hostInfo = entry.getKey();
+            out.writeUTF(hostInfo.host());
+            out.writeInt(hostInfo.port());
+            writeTopicPartitions(out, entry.getValue());
+        }
+    }
+
+    private void writeTopicPartitions(final DataOutputStream out,
+                                      final Set<TopicPartition> partitions) throws IOException {
         out.writeInt(partitions.size());
-        for (TopicPartition partition : partitions) {
+        for (final TopicPartition partition : partitions) {
             out.writeUTF(partition.topic());
             out.writeInt(partition.partition());
         }
@@ -117,52 +164,69 @@ public class AssignmentInfo {
     /**
      * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
      */
-    public static AssignmentInfo decode(ByteBuffer data) {
+    public static AssignmentInfo decode(final ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
-            // Decode version
-            int version = in.readInt();
-            if (version < 0 || version > CURRENT_VERSION) {
-                TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
-                log.error(ex.getMessage(), ex);
-                throw ex;
-            }
+        try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
+            // decode used version
+            final int usedVersion = in.readInt();
+            final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);
 
-            // Decode active tasks
-            int count = in.readInt();
-            List<TaskId> activeTasks = new ArrayList<>(count);
-            for (int i = 0; i < count; i++) {
-                activeTasks.add(TaskId.readFrom(in));
-            }
-            // Decode standby tasks
-            count = in.readInt();
-            Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
-            for (int i = 0; i < count; i++) {
-                TaskId id = TaskId.readFrom(in);
-                standbyTasks.put(id, readTopicPartitions(in));
+            switch (usedVersion) {
+                case 1:
+                    decodeVersionOneData(assignmentInfo, in);
+                    break;
+                case 2:
+                    decodeVersionTwoData(assignmentInfo, in);
+                    break;
+                default:
+                    TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+                        "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+                    log.error(fatalException.getMessage(), fatalException);
+                    throw fatalException;
             }
 
-            Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
-            if (version == CURRENT_VERSION) {
-                int numEntries = in.readInt();
-                for (int i = 0; i < numEntries; i++) {
-                    HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
-                    hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
-                }
-            }
+            return assignmentInfo;
+        } catch (final IOException ex) {
+            throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
+        }
+    }
+
+    private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
+                                             final DataInputStream in) throws IOException {
+        // decode active tasks
+        int count = in.readInt();
+        assignmentInfo.activeTasks = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            assignmentInfo.activeTasks.add(TaskId.readFrom(in));
+        }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
+        // decode standby tasks
+        count = in.readInt();
+        assignmentInfo.standbyTasks = new HashMap<>(count);
+        for (int i = 0; i < count; i++) {
+            TaskId id = TaskId.readFrom(in);
+            assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
+        }
+    }
 
-        } catch (IOException ex) {
-            throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
+    private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
+                                             final DataInputStream in) throws IOException {
+        decodeVersionOneData(assignmentInfo, in);
+
+        // decode partitions by host
+        assignmentInfo.partitionsByHost = new HashMap<>();
+        final int numEntries = in.readInt();
+        for (int i = 0; i < numEntries; i++) {
+            final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+            assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in));
         }
     }
 
-    private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
-        int numPartitions = in.readInt();
-        Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+    private static Set<TopicPartition> readTopicPartitions(final DataInputStream in) throws IOException {
+        final int numPartitions = in.readInt();
+        final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
         for (int j = 0; j < numPartitions; j++) {
             partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
         }
@@ -171,14 +235,14 @@ public class AssignmentInfo {
 
     @Override
     public int hashCode() {
-        return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+        return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (o instanceof AssignmentInfo) {
-            AssignmentInfo other = (AssignmentInfo) o;
-            return this.version == other.version &&
+            final AssignmentInfo other = (AssignmentInfo) o;
+            return this.usedVersion == other.usedVersion &&
                     this.activeTasks.equals(other.activeTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
                     this.partitionsByHost.equals(other.partitionsByHost);
@@ -189,7 +253,7 @@ public class AssignmentInfo {
 
     @Override
     public String toString() {
-        return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
+        return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dba..7fee90b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,42 +31,96 @@ public class SubscriptionInfo {
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int LATEST_SUPPORTED_VERSION = 2;
 
-    public final int version;
-    public final UUID processId;
-    public final Set<TaskId> prevTasks;
-    public final Set<TaskId> standbyTasks;
-    public final String userEndPoint;
+    private final int usedVersion;
+    private UUID processId;
+    private Set<TaskId> prevTasks;
+    private Set<TaskId> standbyTasks;
+    private String userEndPoint;
 
-    public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
-        this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+    private SubscriptionInfo(final int version) {
+        this.usedVersion = version;
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
-        this.version = version;
+    public SubscriptionInfo(final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+    }
+
+    public SubscriptionInfo(final int version,
+                            final UUID processId,
+                            final Set<TaskId> prevTasks,
+                            final Set<TaskId> standbyTasks,
+                            final String userEndPoint) {
+        this.usedVersion = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
         this.standbyTasks = standbyTasks;
         this.userEndPoint = userEndPoint;
     }
 
+    public int version() {
+        return usedVersion;
+    }
+
+    public UUID processId() {
+        return processId;
+    }
+
+    public Set<TaskId> prevTasks() {
+        return prevTasks;
+    }
+
+    public Set<TaskId> standbyTasks() {
+        return standbyTasks;
+    }
+
+    public String userEndPoint() {
+        return userEndPoint;
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to encode the data
      */
     public ByteBuffer encode() {
-        byte[] endPointBytes;
-        if (userEndPoint == null) {
-            endPointBytes = new byte[0];
-        } else {
-            endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
+        final ByteBuffer buf;
+
+        switch (usedVersion) {
+            case 1:
+                buf = encodeVersionOne();
+                break;
+            case 2:
+                buf = encodeVersionTwo(prepareUserEndPoint());
+                break;
+            default:
+                throw new IllegalStateException("Unknown metadata version: " + usedVersion
+                    + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
         }
-        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
-                prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
-                + 4 /* length of bytes */ + endPointBytes.length
-        );
-        // version
-        buf.putInt(version);
+
+        buf.rewind();
+        return buf;
+    }
+
+    private ByteBuffer encodeVersionOne() {
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
+
+        buf.putInt(1); // version
+        encodeVersionOneData(buf);
+
+        return buf;
+    }
+
+    private int getVersionOneByteLength() {
+        return 4 + // version
+               16 + // client ID
+               4 + prevTasks.size() * 8 + // length + prev tasks
+               4 + standbyTasks.size() * 8; // length + standby tasks
+    }
+
+    private void encodeVersionOneData(final ByteBuffer buf) {
         // encode client UUID
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
@@ -80,60 +134,104 @@ public class SubscriptionInfo {
         for (TaskId id : standbyTasks) {
             id.writeTo(buf);
         }
-        buf.putInt(endPointBytes.length);
-        buf.put(endPointBytes);
-        buf.rewind();
+    }
+
+    private byte[] prepareUserEndPoint() {
+        if (userEndPoint == null) {
+            return new byte[0];
+        } else {
+            return userEndPoint.getBytes(Charset.forName("UTF-8"));
+        }
+    }
+
+    private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+        final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
+
+        buf.putInt(2); // version
+        encodeVersionTwoData(buf, endPointBytes);
+
         return buf;
     }
 
+    private int getVersionTwoByteLength(final byte[] endPointBytes) {
+        return getVersionOneByteLength() +
+               4 + endPointBytes.length; // length + userEndPoint
+    }
+
+    private void encodeVersionTwoData(final ByteBuffer buf,
+                                      final byte[] endPointBytes) {
+        encodeVersionOneData(buf);
+        if (endPointBytes != null) {
+            buf.putInt(endPointBytes.length);
+            buf.put(endPointBytes);
+        }
+    }
+
     /**
      * @throws TaskAssignmentException if method fails to decode the data
      */
-    public static SubscriptionInfo decode(ByteBuffer data) {
+    public static SubscriptionInfo decode(final ByteBuffer data) {
         // ensure we are at the beginning of the ByteBuffer
         data.rewind();
 
-        // Decode version
-        int version = data.getInt();
-        if (version == CURRENT_VERSION || version == 1) {
-            // Decode client UUID
-            UUID processId = new UUID(data.getLong(), data.getLong());
-            // Decode previously active tasks
-            Set<TaskId> prevTasks = new HashSet<>();
-            int numPrevs = data.getInt();
-            for (int i = 0; i < numPrevs; i++) {
-                TaskId id = TaskId.readFrom(data);
-                prevTasks.add(id);
-            }
-            // Decode previously cached tasks
-            Set<TaskId> standbyTasks = new HashSet<>();
-            int numCached = data.getInt();
-            for (int i = 0; i < numCached; i++) {
-                standbyTasks.add(TaskId.readFrom(data));
-            }
-
-            String userEndPoint = null;
-            if (version == CURRENT_VERSION) {
-                int bytesLength = data.getInt();
-                if (bytesLength != 0) {
-                    byte[] bytes = new byte[bytesLength];
-                    data.get(bytes);
-                    userEndPoint = new String(bytes, Charset.forName("UTF-8"));
-                }
-
-            }
-            return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
+        // decode used version
+        final int usedVersion = data.getInt();
+        final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion);
+
+        switch (usedVersion) {
+            case 1:
+                decodeVersionOneData(subscriptionInfo, data);
+                break;
+            case 2:
+                decodeVersionTwoData(subscriptionInfo, data);
+                break;
+            default:
+                TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+                    "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+                log.error(fatalException.getMessage(), fatalException);
+                throw fatalException;
+        }
+
+        return subscriptionInfo;
+    }
 
-        } else {
-            TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
-            log.error(ex.getMessage(), ex);
-            throw ex;
+    private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
+                                             final ByteBuffer data) {
+        // decode client UUID
+        subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+
+        // decode previously active tasks
+        final int numPrevs = data.getInt();
+        subscriptionInfo.prevTasks = new HashSet<>();
+        for (int i = 0; i < numPrevs; i++) {
+            TaskId id = TaskId.readFrom(data);
+            subscriptionInfo.prevTasks.add(id);
+        }
+
+        // decode previously cached tasks
+        final int numCached = data.getInt();
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        for (int i = 0; i < numCached; i++) {
+            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+        }
+    }
+
+    private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
+                                             final ByteBuffer data) {
+        decodeVersionOneData(subscriptionInfo, data);
+
+        // decode user end point (can be null)
+        int bytesLength = data.getInt();
+        if (bytesLength != 0) {
+            final byte[] bytes = new byte[bytesLength];
+            data.get(bytes);
+            subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8"));
         }
     }
 
     @Override
     public int hashCode() {
-        int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+        final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
             return hashCode;
         }
@@ -141,10 +239,10 @@ public class SubscriptionInfo {
     }
 
     @Override
-    public boolean equals(Object o) {
+    public boolean equals(final Object o) {
         if (o instanceof SubscriptionInfo) {
-            SubscriptionInfo other = (SubscriptionInfo) o;
-            return this.version == other.version &&
+            final SubscriptionInfo other = (SubscriptionInfo) o;
+            return this.usedVersion == other.usedVersion &&
                     this.processId.equals(other.processId) &&
                     this.prevTasks.equals(other.prevTasks) &&
                     this.standbyTasks.equals(other.standbyTasks) &&
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 8b4e895..bb06c72 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -376,7 +376,6 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-
     @Test
     public void queryOnRebalance() throws InterruptedException {
         final int numThreads = STREAM_TWO_PARTITIONS;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index bf3f1d1..b0c0d68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -239,17 +239,17 @@ public class StreamsPartitionAssignorTest {
 
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
+        allActiveTasks.addAll(info11.activeTasks());
 
         assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
 
         // the third consumer
         AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
+        allActiveTasks.addAll(info20.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -317,13 +317,13 @@ public class StreamsPartitionAssignorTest {
         final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
 
         final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
-        assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+        assertEquals(expectedInfo10TaskIds, info10.activeTasks());
 
         // the second consumer
         final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
         final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
 
-        assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+        assertEquals(expectedInfo11TaskIds, info11.activeTasks());
     }
 
     @Test
@@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest {
         // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -394,7 +394,7 @@ public class StreamsPartitionAssignorTest {
         // check assignment info
         Set<TaskId> allActiveTasks = new HashSet<>();
         AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
         assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@@ -407,7 +407,7 @@ public class StreamsPartitionAssignorTest {
 
         // the first consumer
         info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
+        allActiveTasks.addAll(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -455,15 +455,15 @@ public class StreamsPartitionAssignorTest {
         AssignmentInfo info;
 
         info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer10").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer11").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer20").userData());
-        allActiveTasks.addAll(info.activeTasks);
+        allActiveTasks.addAll(info.activeTasks());
         allPartitions.addAll(assignments.get("consumer20").partitions());
 
         assertEquals(allTasks, allActiveTasks);
@@ -524,14 +524,14 @@ public class StreamsPartitionAssignorTest {
         AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
         AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
 
-        assertEquals(2, info10.activeTasks.size());
-        assertEquals(2, info11.activeTasks.size());
-        assertEquals(2, info20.activeTasks.size());
+        assertEquals(2, info10.activeTasks().size());
+        assertEquals(2, info11.activeTasks().size());
+        assertEquals(2, info20.activeTasks().size());
 
         Set<TaskId> allTasks = new HashSet<>();
-        allTasks.addAll(info10.activeTasks);
-        allTasks.addAll(info11.activeTasks);
-        allTasks.addAll(info20.activeTasks);
+        allTasks.addAll(info10.activeTasks());
+        allTasks.addAll(info11.activeTasks());
+        allTasks.addAll(info20.activeTasks());
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
@@ -603,15 +603,15 @@ public class StreamsPartitionAssignorTest {
 
         // the first consumer
         AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        allActiveTasks.addAll(info10.activeTasks);
-        allStandbyTasks.addAll(info10.standbyTasks.keySet());
+        allActiveTasks.addAll(info10.activeTasks());
+        allStandbyTasks.addAll(info10.standbyTasks().keySet());
 
         // the second consumer
         AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
-        allActiveTasks.addAll(info11.activeTasks);
-        allStandbyTasks.addAll(info11.standbyTasks.keySet());
+        allActiveTasks.addAll(info11.activeTasks());
+        allStandbyTasks.addAll(info11.standbyTasks().keySet());
 
-        assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
+        assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
 
         // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
@@ -619,8 +619,8 @@ public class StreamsPartitionAssignorTest {
 
         // the third consumer
         AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
-        allActiveTasks.addAll(info20.activeTasks);
-        allStandbyTasks.addAll(info20.standbyTasks.keySet());
+        allActiveTasks.addAll(info20.activeTasks());
+        allStandbyTasks.addAll(info20.standbyTasks().keySet());
 
         // all task ids are in the active tasks and also in the standby tasks
 
@@ -847,7 +847,7 @@ public class StreamsPartitionAssignorTest {
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
-        assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+        assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
     }
 
     @Test
@@ -874,7 +874,7 @@ public class StreamsPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
-        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
+        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
         assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
                 new TopicPartition("topic1", 1),
                 new TopicPartition("topic1", 2)), topicPartitions);
@@ -1072,8 +1072,8 @@ public class StreamsPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
         final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
-        final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
-        final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
+        final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
+        final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
         final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
         allAssignedPartitions.addAll(consumer2Partitions);
         assertThat(consumer1partitions, not(allPartitions));
@@ -1095,6 +1095,37 @@ public class StreamsPartitionAssignorTest {
         partitionAssignor.configure(config);
     }
 
+    @Test
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+
+        mockTaskManager(Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            UUID.randomUUID(),
+            new InternalTopologyBuilder());
+        partitionAssignor.configure(configProps());
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        assertThat(assignment.size(), equalTo(2));
+        assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1));
+        assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, Set<TopicPartition>>emptyMap(),
@@ -1111,7 +1142,7 @@ public class StreamsPartitionAssignorTest {
         AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
 
         // check if the number of assigned partitions == the size of active task id list
-        assertEquals(assignment.partitions().size(), info.activeTasks.size());
+        assertEquals(assignment.partitions().size(), info.activeTasks().size());
 
         // check if active tasks are consistent
         List<TaskId> activeTasks = new ArrayList<>();
@@ -1121,14 +1152,14 @@ public class StreamsPartitionAssignorTest {
             activeTasks.add(new TaskId(0, partition.partition()));
             activeTopics.add(partition.topic());
         }
-        assertEquals(activeTasks, info.activeTasks);
+        assertEquals(activeTasks, info.activeTasks());
 
         // check if active partitions cover all topics
         assertEquals(expectedTopics, activeTopics);
 
         // check if standby tasks are consistent
         Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) {
             TaskId id = entry.getKey();
             Set<TopicPartition> partitions = entry.getValue();
             for (TopicPartition partition : partitions) {
@@ -1139,7 +1170,7 @@ public class StreamsPartitionAssignorTest {
             }
         }
 
-        if (info.standbyTasks.size() > 0) {
+        if (info.standbyTasks().size() > 0) {
             // check if standby partitions cover all topics
             assertEquals(expectedTopics, standbyTopics);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ec94ad8..726a562 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class AssignmentInfoTest {
 
@@ -61,10 +62,10 @@ public class AssignmentInfoTest {
         standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
         final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
         final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
-        assertEquals(oldVersion.activeTasks, decoded.activeTasks);
-        assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
-        assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+        assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
+        assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
+        assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1
+        assertEquals(1, decoded.version());
     }
 
 
@@ -76,15 +77,15 @@ public class AssignmentInfoTest {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream out = new DataOutputStream(baos);
         // Encode version
-        out.writeInt(oldVersion.version);
+        out.writeInt(oldVersion.version());
         // Encode active tasks
-        out.writeInt(oldVersion.activeTasks.size());
-        for (TaskId id : oldVersion.activeTasks) {
+        out.writeInt(oldVersion.activeTasks().size());
+        for (TaskId id : oldVersion.activeTasks()) {
             id.writeTo(out);
         }
         // Encode standby tasks
-        out.writeInt(oldVersion.standbyTasks.size());
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
+        out.writeInt(oldVersion.standbyTasks().size());
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) {
             TaskId id = entry.getKey();
             id.writeTo(out);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 9c011bb..633285a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -65,14 +65,12 @@ public class SubscriptionInfoTest {
 
         final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
         final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
-        assertEquals(activeTasks, decode.prevTasks);
-        assertEquals(standbyTasks, decode.standbyTasks);
-        assertEquals(processId, decode.processId);
-        assertNull(decode.userEndPoint);
-
+        assertEquals(activeTasks, decode.prevTasks());
+        assertEquals(standbyTasks, decode.standbyTasks());
+        assertEquals(processId, decode.processId());
+        assertNull(decode.userEndPoint());
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has changed for V2
      * so it is impossible to test compatibility without having this

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.