You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/19 19:49:21 UTC

[kafka] branch 2.6 updated: KAFKA-10455: Ensure that probing rebalances always occur (#9383)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 80e6011  KAFKA-10455: Ensure that probing rebalances always occur (#9383)
80e6011 is described below

commit 80e601184e31e9e95c238b7e1e73b3522d92f392
Author: leah <lt...@confluent.io>
AuthorDate: Mon Oct 19 13:29:35 2020 -0500

    KAFKA-10455: Ensure that probing rebalances always occur (#9383)
    
    Add dummy data to subscriptionUserData to make sure that
    it is different each time a member rejoins.
    
    Reviewers: A. Sophie Blee-Goldman <ab...@apache.org>, John Roesler <vv...@apache.org>
---
 .../internals/StreamsPartitionAssignor.java        | 12 ++++-
 .../internals/assignment/AssignmentInfo.java       |  2 +
 .../StreamsAssignmentProtocolVersions.java         |  2 +-
 .../internals/assignment/SubscriptionInfo.java     |  6 ++-
 .../common/message/SubscriptionInfoData.json       |  7 ++-
 ...ighAvailabilityTaskAssignorIntegrationTest.java |  4 +-
 ...ghAvailabilityStreamsPartitionAssignorTest.java |  2 +-
 .../internals/StreamsPartitionAssignorTest.java    | 56 ++++++++++++++++++++--
 .../internals/assignment/AssignmentTestUtils.java  |  2 +-
 .../internals/assignment/SubscriptionInfoTest.java | 45 +++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  9 ++--
 11 files changed, 118 insertions(+), 29 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 304ec30..ef4bca9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -182,6 +182,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
     private AssignmentListener assignmentListener;
 
     private Supplier<TaskAssignor> taskAssignorSupplier;
+    private byte uniqueField;
 
     /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
@@ -212,6 +213,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
         taskAssignorSupplier = assignorConfiguration::taskAssignor;
         assignmentListener = assignorConfiguration.assignmentListener();
+        uniqueField = 0;
     }
 
     @Override
@@ -234,15 +236,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         // Adds the following information to subscription
         // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
         // 2. Map from task id to its overall lag
+        // 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different
 
         handleRebalanceStart(topics);
+        uniqueField++;
 
         return new SubscriptionInfo(
             usedSubscriptionMetadataVersion,
             LATEST_SUPPORTED_VERSION,
             taskManager.processId(),
             userEndPoint,
-            taskManager.getTaskOffsetSums())
+            taskManager.getTaskOffsetSums(),
+            uniqueField)
                 .encode();
     }
 
@@ -1398,6 +1403,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                 encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
                 break;
             case 7:
+            case 8:
                 validateActiveTaskEncoding(partitions, info, logPrefix);
 
                 activeTasks = getActiveTasks(partitions, info);
@@ -1555,6 +1561,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         return taskManager;
     }
 
+    protected byte uniqueField() {
+        return uniqueField;
+    }
+
     protected void handleRebalanceStart(final Set<String> topics) {
         taskManager.handleRebalanceStart(topics);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index aa2d400..7183eb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -179,6 +179,7 @@ public class AssignmentInfo {
                     out.writeInt(errCode);
                     break;
                 case 7:
+                case 8:
                     out.writeInt(usedVersion);
                     out.writeInt(commonlySupportedVersion);
                     encodeActiveAndStandbyTaskAssignment(out);
@@ -352,6 +353,7 @@ public class AssignmentInfo {
                     assignmentInfo.errCode = in.readInt();
                     break;
                 case 7:
+                case 8:
                     commonlySupportedVersion = in.readInt();
                     assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeActiveTasks(assignmentInfo, in);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
index 51d70a8..aa33406 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
 public final class StreamsAssignmentProtocolVersions {
     public static final int UNKNOWN = -1;
     public static final int EARLIEST_PROBEABLE_VERSION = 3;
-    public static final int LATEST_SUPPORTED_VERSION = 7;
+    public static final int LATEST_SUPPORTED_VERSION = 8;
 
     private StreamsAssignmentProtocolVersions() {}
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 49496cf..0129853 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -81,7 +81,8 @@ public class SubscriptionInfo {
                             final int latestSupportedVersion,
                             final UUID processId,
                             final String userEndPoint,
-                            final Map<TaskId, Long> taskOffsetSums) {
+                            final Map<TaskId, Long> taskOffsetSums,
+                            final byte uniqueField) {
         validateVersions(version, latestSupportedVersion);
         final SubscriptionInfoData data = new SubscriptionInfoData();
         data.setVersion(version);
@@ -95,6 +96,9 @@ public class SubscriptionInfo {
         if (version >= 3) {
             data.setLatestSupportedVersion(latestSupportedVersion);
         }
+        if (version >= 8) {
+            data.setUniqueField(uniqueField);
+        }
 
         this.data = data;
 
diff --git a/streams/src/main/resources/common/message/SubscriptionInfoData.json b/streams/src/main/resources/common/message/SubscriptionInfoData.json
index af0d2f8..f80ea54 100644
--- a/streams/src/main/resources/common/message/SubscriptionInfoData.json
+++ b/streams/src/main/resources/common/message/SubscriptionInfoData.json
@@ -15,7 +15,7 @@
 
 {
   "name": "SubscriptionInfoData",
-  "validVersions": "1-7",
+  "validVersions": "1-8",
   "fields": [
     {
       "name": "version",
@@ -52,6 +52,11 @@
       "name": "taskOffsetSums",
       "versions": "7+",
       "type": "[]TaskOffsetSum"
+    },
+    {
+      "name": "uniqueField",
+      "versions": "8+",
+      "type": "int8"
     }
   ],
   "commonStructs": [
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 7e7cf29..530a854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -286,7 +286,9 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
                 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"),
                 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
                 mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
-                mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName())
+                mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()),
+                // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
+                mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)
             )
         );
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index deccfa4..35940de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -322,7 +322,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
     private static SubscriptionInfo getInfo(final UUID processId,
                                             final Set<TaskId> prevTasks) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks));
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0);
     }
 
     // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 6d68498..b75d278 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -186,6 +186,7 @@ public class StreamsPartitionAssignorTest {
     private final AtomicInteger assignmentError = new AtomicInteger();
     private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
     private final MockTime time = new MockTime();
+    private final byte uniqueField = 1;
 
     private Map<String, Object> configProps() {
         final Map<String, Object> configurationMap = new HashMap<>();
@@ -509,7 +510,7 @@ public class StreamsPartitionAssignorTest {
         Collections.sort(subscription.topics());
         assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-        final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
+        final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField);
         assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
     }
 
@@ -535,7 +536,7 @@ public class StreamsPartitionAssignorTest {
         Collections.sort(subscription.topics());
         assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-        final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
+        final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField);
         assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
     }
 
@@ -1958,6 +1959,34 @@ public class StreamsPartitionAssignorTest {
         EasyMock.verify(consumerClient);
     }
 
+    @Test
+    public void testUniqueField() {
+        createDefaultMockTaskManager();
+        configureDefaultPartitionAssignor();
+        final Set<String> topics = mkSet("input");
+
+        assertEquals(0, partitionAssignor.uniqueField());
+        partitionAssignor.subscriptionUserData(topics);
+        assertEquals(1, partitionAssignor.uniqueField());
+        partitionAssignor.subscriptionUserData(topics);
+        assertEquals(2, partitionAssignor.uniqueField());
+
+    }
+
+    @Test
+    public void testUniqueFieldOverflow() {
+        createDefaultMockTaskManager();
+        configureDefaultPartitionAssignor();
+        final Set<String> topics = mkSet("input");
+
+        for (int i = 0; i < 127; i++) {
+            partitionAssignor.subscriptionUserData(topics);
+        }
+        assertEquals(127, partitionAssignor.uniqueField());
+        partitionAssignor.subscriptionUserData(topics);
+        assertEquals(-128, partitionAssignor.uniqueField());
+    }
+
     private static ByteBuffer encodeFutureSubscription() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
         buf.putInt(LATEST_SUPPORTED_VERSION + 1);
@@ -2073,7 +2102,15 @@ public class StreamsPartitionAssignorTest {
                                             final Set<TaskId> prevTasks,
                                             final Set<TaskId> standbyTasks) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks));
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
+    }
+
+    private static SubscriptionInfo getInfo(final UUID processId,
+                                            final Set<TaskId> prevTasks,
+                                            final Set<TaskId> standbyTasks,
+                                            final byte uniqueField) {
+        return new SubscriptionInfo(
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField);
     }
 
     private static SubscriptionInfo getInfo(final UUID processId,
@@ -2081,7 +2118,16 @@ public class StreamsPartitionAssignorTest {
                                             final Set<TaskId> standbyTasks,
                                             final String userEndPoint) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks));
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
+    }
+
+    private static SubscriptionInfo getInfo(final UUID processId,
+                                            final Set<TaskId> prevTasks,
+                                            final Set<TaskId> standbyTasks,
+                                            final String userEndPoint,
+                                            final byte uniqueField) {
+        return new SubscriptionInfo(
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField);
     }
 
     private static SubscriptionInfo getInfoForOlderVersion(final int version,
@@ -2089,7 +2135,7 @@ public class StreamsPartitionAssignorTest {
                                                            final Set<TaskId> prevTasks,
                                                            final Set<TaskId> standbyTasks) {
         return new SubscriptionInfo(
-            version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks));
+            version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
     }
 
     // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index f0139ff..2bba47b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -86,7 +86,7 @@ public final class AssignmentTestUtils {
         }
         return clientStates;
     }
-
+    
     /**
      * Builds a UUID by repeating the given number n. For valid n, it is guaranteed that the returned UUIDs satisfy
      * the same relation relative to others as their parameter n does: iff n < m, then uuidForInt(n) < uuidForInt(m)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index b76ce59..981118e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -59,6 +59,7 @@ public class SubscriptionInfoTest {
     );
 
     private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
+    private static final byte IGNORED_UNIQUE_FIELD = (byte) 0;
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowForUnknownVersion1() {
@@ -67,7 +68,8 @@ public class SubscriptionInfoTest {
             LATEST_SUPPORTED_VERSION,
             UUID_1,
             "localhost:80",
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
     }
 
@@ -78,7 +80,8 @@ public class SubscriptionInfoTest {
             LATEST_SUPPORTED_VERSION,
             UUID_1,
             "localhost:80",
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
     }
 
@@ -89,7 +92,8 @@ public class SubscriptionInfoTest {
             LATEST_SUPPORTED_VERSION,
             UUID_1,
             IGNORED_USER_ENDPOINT,
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
         final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
         assertEquals(1, decoded.version());
@@ -107,7 +111,8 @@ public class SubscriptionInfoTest {
             1234,
             UUID_1,
             "ignoreme",
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
         final ByteBuffer buffer = info.encode();
 
@@ -148,7 +153,8 @@ public class SubscriptionInfoTest {
             LATEST_SUPPORTED_VERSION,
             UUID_1,
             "localhost:80",
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
         final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
         assertEquals(2, decoded.version());
@@ -166,7 +172,8 @@ public class SubscriptionInfoTest {
             LATEST_SUPPORTED_VERSION,
             UUID_1,
             "localhost:80",
-            TASK_OFFSET_SUMS
+            TASK_OFFSET_SUMS,
+            IGNORED_UNIQUE_FIELD
         );
         final ByteBuffer buffer = info.encode();
 
@@ -208,7 +215,8 @@ public class SubscriptionInfoTest {
                 LATEST_SUPPORTED_VERSION,
                 UUID_1,
                 "localhost:80",
-                TASK_OFFSET_SUMS
+                TASK_OFFSET_SUMS,
+                IGNORED_UNIQUE_FIELD
             );
             final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
             assertEquals(version, decoded.version());
@@ -228,7 +236,8 @@ public class SubscriptionInfoTest {
                 LATEST_SUPPORTED_VERSION,
                 UUID_1,
                 "localhost:80",
-                TASK_OFFSET_SUMS
+                TASK_OFFSET_SUMS,
+                IGNORED_UNIQUE_FIELD
             );
             final ByteBuffer buffer = info.encode();
 
@@ -269,7 +278,7 @@ public class SubscriptionInfoTest {
     @Test
     public void shouldEncodeAndDecodeVersion5() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         assertEquals(info, SubscriptionInfo.decode(info.encode()));
     }
 
@@ -286,23 +295,23 @@ public class SubscriptionInfoTest {
         final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
 
         final SubscriptionInfo info =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         final SubscriptionInfo expectedInfo =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion7() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldConvertTaskOffsetSumMapToTaskSets() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
         assertThat(info.prevTasks(), is(ACTIVE_TASKS));
         assertThat(info.standbyTasks(), is(STANDBY_TASKS));
     }
@@ -313,7 +322,8 @@ public class SubscriptionInfoTest {
             new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
                                  LATEST_SUPPORTED_VERSION, UUID_1,
                                  "localhost:80",
-                                 TASK_OFFSET_SUMS)
+                                 TASK_OFFSET_SUMS,
+                                 IGNORED_UNIQUE_FIELD)
                 .encode());
         assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
     }
@@ -341,6 +351,13 @@ public class SubscriptionInfoTest {
         assertThat(info.taskOffsetSums(), is(expectedOffsetSumsMap));
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion8() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
+        assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+    }
+
     private static ByteBuffer encodeFutureVersion() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
                                                        + 4 /* supported version */);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index af4d69c..d18657d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -151,14 +151,16 @@ public class StreamsUpgradeTest {
             // 3. Task ids of valid local states on the client's state directory.
             final TaskManager taskManager = taskManager();
             handleRebalanceStart(topics);
-
+            byte uniqueField = 0;
             if (usedSubscriptionMetadataVersion <= LATEST_SUPPORTED_VERSION) {
+                uniqueField++;
                 return new SubscriptionInfo(
                     usedSubscriptionMetadataVersion,
                     LATEST_SUPPORTED_VERSION + 1,
                     taskManager.processId(),
                     userEndPoint(),
-                    taskManager.getTaskOffsetSums()
+                    taskManager.getTaskOffsetSums(),
+                    uniqueField
                 ).encode();
             } else {
                 return new FutureSubscriptionInfo(
@@ -257,7 +259,8 @@ public class StreamsUpgradeTest {
                                 LATEST_SUPPORTED_VERSION,
                                 info.processId(),
                                 info.userEndPoint(),
-                                taskManager().getTaskOffsetSums())
+                                taskManager().getTaskOffsetSums(),
+                                (byte) 0)
                                 .encode(),
                             subscription.ownedPartitions()
                         ));