You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/11 08:30:47 UTC

[kafka] branch trunk updated: KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87eb0cf  KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)
87eb0cf is described below

commit 87eb0cf03c879fe77e6e72b490fcf0b9621dd559
Author: Levani Kokhreidze <le...@transferwise.com>
AuthorDate: Fri Mar 11 10:29:05 2022 +0200

    KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)
    
    adds ClientTags to SubscriptionInfoData
    
    Reviewer: Luke Chen <sh...@gmail.com>, Bruno Cadonna <ca...@apache.org>
---
 .../internals/StreamsPartitionAssignor.java        |  4 +-
 .../internals/assignment/AssignmentInfo.java       |  2 +
 .../StreamsAssignmentProtocolVersions.java         |  2 +-
 .../internals/assignment/SubscriptionInfo.java     | 28 ++++++-
 .../common/message/SubscriptionInfoData.json       | 23 +++++-
 ...ghAvailabilityStreamsPartitionAssignorTest.java |  2 +-
 .../internals/StreamsPartitionAssignorTest.java    |  3 +-
 .../internals/assignment/AssignmentTestUtils.java  |  6 +-
 .../internals/assignment/SubscriptionInfoTest.java | 85 ++++++++++++++++------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  9 ++-
 .../tests/streams/streams_upgrade_test.py          |  2 +-
 11 files changed, 132 insertions(+), 34 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 03f826a..d2fa905 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -264,7 +264,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             userEndPoint,
             taskOffsetSums,
             uniqueField,
-            assignmentErrorCode.get()
+            assignmentErrorCode.get(),
+            Collections.emptyMap()
         ).encode();
     }
 
@@ -1307,6 +1308,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             case 8:
             case 9:
             case 10:
+            case 11:
                 validateActiveTaskEncoding(partitions, info, logPrefix);
 
                 activeTasks = getActiveTasks(partitions, info);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index ecf0b25..350426e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -185,6 +185,7 @@ public class AssignmentInfo {
                 case 8:
                 case 9:
                 case 10:
+                case 11:
                     out.writeInt(usedVersion);
                     out.writeInt(commonlySupportedVersion);
                     encodeActiveAndStandbyTaskAssignment(out);
@@ -361,6 +362,7 @@ public class AssignmentInfo {
                 case 8:
                 case 9:
                 case 10:
+                case 11:
                     commonlySupportedVersion = in.readInt();
                     assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
                     decodeActiveTasks(assignmentInfo, in);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
index 9bd7142..47169ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
@@ -20,7 +20,7 @@ public final class StreamsAssignmentProtocolVersions {
     public static final int UNKNOWN = -1;
     public static final int EARLIEST_PROBEABLE_VERSION = 3;
     public static final int MIN_NAMED_TOPOLOGY_VERSION = 10;
-    public static final int LATEST_SUPPORTED_VERSION = 10;
+    public static final int LATEST_SUPPORTED_VERSION = 11;
     /*
      * Any time you modify the subscription or assignment info, you need to bump the latest supported version, unless
      * the version has already been bumped within the current release cycle.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 58d2dbe..71168c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.internals.generated.SubscriptionInfoData;
+import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.ClientTag;
 import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.PartitionToOffsetSum;
 import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.TaskOffsetSum;
 import org.apache.kafka.streams.processor.TaskId;
@@ -87,7 +88,8 @@ public class SubscriptionInfo {
                             final String userEndPoint,
                             final Map<TaskId, Long> taskOffsetSums,
                             final byte uniqueField,
-                            final int errorCode) {
+                            final int errorCode,
+                            final Map<String, String> clientTags) {
         validateVersions(version, latestSupportedVersion);
         final SubscriptionInfoData data = new SubscriptionInfoData();
         data.setVersion(version);
@@ -108,6 +110,9 @@ public class SubscriptionInfo {
         if (version >= 9) {
             data.setErrorCode(errorCode);
         }
+        if (version >= 11) {
+            data.setClientTags(buildClientTagsFromMap(clientTags));
+        }
 
         this.data = data;
 
@@ -125,10 +130,31 @@ public class SubscriptionInfo {
         this.data = subscriptionInfoData;
     }
 
+    public Map<String, String> clientTags() {
+        return data.clientTags().stream()
+            .collect(
+                Collectors.toMap(
+                    clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
+                    clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
+                )
+            );
+    }
+
     public int errorCode() {
         return data.errorCode();
     }
 
+    private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) {
+        return clientTags.entrySet().stream()
+            .map(clientTagEntry -> {
+                final ClientTag clientTag = new ClientTag();
+                clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
+                clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
+                return clientTag;
+            })
+            .collect(Collectors.toList());
+    }
+
     // For version > MIN_NAMED_TOPOLOGY_VERSION
     private void setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) {
         data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
diff --git a/streams/src/main/resources/common/message/SubscriptionInfoData.json b/streams/src/main/resources/common/message/SubscriptionInfoData.json
index f9a830e..6304478 100644
--- a/streams/src/main/resources/common/message/SubscriptionInfoData.json
+++ b/streams/src/main/resources/common/message/SubscriptionInfoData.json
@@ -15,7 +15,7 @@
 
 {
   "name": "SubscriptionInfoData",
-  "validVersions": "1-10",
+  "validVersions": "1-11",
   "flexibleVersions": "none",
   "fields": [
     {
@@ -65,6 +65,11 @@
       "name": "errorCode",
       "versions": "9+",
       "type": "int32"
+    },
+    {
+      "name": "clientTags",
+      "versions": "11+",
+      "type": "[]ClientTag"
     }
   ],
   "commonStructs": [
@@ -136,6 +141,22 @@
           "type": "int64"
         }
       ]
+    },
+    {
+      "name": "ClientTag",
+      "versions": "11+",
+      "fields": [
+        {
+          "name": "key",
+          "versions": "11+",
+          "type": "bytes"
+        },
+        {
+          "name": "value",
+          "versions": "11+",
+          "type": "bytes"
+        }
+      ]
     }
   ],
   "type": "data"
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index 9c2d119..e67793d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -328,7 +328,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
     private static SubscriptionInfo getInfo(final UUID processId,
                                             final Set<TaskId> prevTasks) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0);
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0, Collections.emptyMap());
     }
 
     // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 23e8d20..d11f3e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -105,6 +105,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.common.utils.Utils.mkSortedSet;
 import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -2332,7 +2333,7 @@ public class StreamsPartitionAssignorTest {
                                                            final Set<TaskId> prevTasks,
                                                            final Set<TaskId> standbyTasks) {
         return new SubscriptionInfo(
-            version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
+            version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
     }
 
     // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 627114a..cf8a6b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -147,7 +147,7 @@ public final class AssignmentTestUtils {
                                            final Set<TaskId> prevTasks,
                                            final Set<TaskId> standbyTasks) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
     }
 
     public static SubscriptionInfo getInfo(final UUID processId,
@@ -155,7 +155,7 @@ public final class AssignmentTestUtils {
                                            final Set<TaskId> standbyTasks,
                                            final String userEndPoint) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0,  0);
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
     }
 
     public static SubscriptionInfo getInfo(final UUID processId,
@@ -163,7 +163,7 @@ public final class AssignmentTestUtils {
                                            final Set<TaskId> standbyTasks,
                                            final byte uniqueField) {
         return new SubscriptionInfo(
-            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0);
+            LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
     }
 
     // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index dd65196..b635a50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_0;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_1;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_1_0;
@@ -50,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Subscripti
 import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
@@ -81,12 +83,12 @@ public class SubscriptionInfoTest {
         mkEntry(NAMED_TASK_T2_0_0, 10L),
         mkEntry(NAMED_TASK_T2_2_0, 5L)
         );
+    private static final Map<String, String> CLIENT_TAGS = mkMap(mkEntry("t1", "v1"), mkEntry("t2", "v2"));
 
     private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
     private static final byte IGNORED_UNIQUE_FIELD = (byte) 0;
     private static final int IGNORED_ERROR_CODE = 0;
 
-
     @Test
     public void shouldThrowForUnknownVersion1() {
         assertThrows(IllegalArgumentException.class, () -> new SubscriptionInfo(
@@ -96,7 +98,8 @@ public class SubscriptionInfoTest {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         ));
     }
 
@@ -109,7 +112,8 @@ public class SubscriptionInfoTest {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         ));
     }
 
@@ -122,7 +126,8 @@ public class SubscriptionInfoTest {
             IGNORED_USER_ENDPOINT,
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         );
         final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
         assertEquals(1, decoded.version());
@@ -142,7 +147,8 @@ public class SubscriptionInfoTest {
             "ignoreme",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         );
         final ByteBuffer buffer = info.encode();
 
@@ -185,7 +191,8 @@ public class SubscriptionInfoTest {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         );
         final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
         assertEquals(2, decoded.version());
@@ -205,7 +212,8 @@ public class SubscriptionInfoTest {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
+            IGNORED_ERROR_CODE,
+            EMPTY_CLIENT_TAGS
         );
         final ByteBuffer buffer = info.encode();
 
@@ -249,7 +257,8 @@ public class SubscriptionInfoTest {
                 "localhost:80",
                 TASK_OFFSET_SUMS,
                 IGNORED_UNIQUE_FIELD,
-                IGNORED_ERROR_CODE
+                IGNORED_ERROR_CODE,
+                EMPTY_CLIENT_TAGS
             );
             final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
             assertEquals(version, decoded.version());
@@ -271,7 +280,8 @@ public class SubscriptionInfoTest {
                 "localhost:80",
                 TASK_OFFSET_SUMS,
                 IGNORED_UNIQUE_FIELD,
-                IGNORED_ERROR_CODE
+                IGNORED_ERROR_CODE,
+                EMPTY_CLIENT_TAGS
             );
             final ByteBuffer buffer = info.encode();
 
@@ -312,7 +322,7 @@ public class SubscriptionInfoTest {
     @Test
     public void shouldEncodeAndDecodeVersion5() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertEquals(info, SubscriptionInfo.decode(info.encode()));
     }
 
@@ -329,23 +339,23 @@ public class SubscriptionInfoTest {
         final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
 
         final SubscriptionInfo info =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         final SubscriptionInfo expectedInfo =
-            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion7() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldConvertTaskOffsetSumMapToTaskSets() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info.prevTasks(), is(ACTIVE_TASKS));
         assertThat(info.standbyTasks(), is(STANDBY_TASKS));
     }
@@ -358,8 +368,8 @@ public class SubscriptionInfoTest {
                                  "localhost:80",
                                  TASK_OFFSET_SUMS,
                                  IGNORED_UNIQUE_FIELD,
-                                 IGNORED_ERROR_CODE
-            ).encode());
+                                 IGNORED_ERROR_CODE,
+                                 EMPTY_CLIENT_TAGS).encode());
         assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
     }
 
@@ -389,14 +399,14 @@ public class SubscriptionInfoTest {
     @Test
     public void shouldEncodeAndDecodeVersion8() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldNotErrorAccessingFutureVars() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         try {
             info.errorCode();
         } catch (final Exception e) {
@@ -407,21 +417,21 @@ public class SubscriptionInfoTest {
     @Test
     public void shouldEncodeAndDecodeVersion9() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
@@ -429,10 +439,41 @@ public class SubscriptionInfoTest {
     public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
         assertThrows(
             TaskAssignmentException.class,
-            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS)
         );
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+        assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+    }
+
+    @Test
+    public void shouldReturnEncodeDecodeEmptyClientTagsOnVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
+        assertThat(info.clientTags(), is(anEmptyMap()));
+        assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+    }
+
+    @Test
+    public void shouldReturnEmptyMapOfClientTagsOnOlderVersions() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+
+        assertThat(info.clientTags(), is(anEmptyMap()));
+    }
+
+    @Test
+    public void shouldReturnMapOfClientTagsOnVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+
+        assertThat(info.clientTags(), is(CLIENT_TAGS));
+    }
+
     private static ByteBuffer encodeFutureVersion() {
         final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
                                                        + 4 /* supported version */);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 94f623e..3f255be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -62,6 +62,8 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public class StreamsUpgradeTest {
@@ -121,6 +123,7 @@ public class StreamsUpgradeTest {
     }
 
     public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
+        private static final Map<String, String> CLIENT_TAGS = mkMap(mkEntry("t1", "v1"), mkEntry("t2", "v2"));
         private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
 
         private AtomicInteger usedSubscriptionMetadataVersionPeek;
@@ -163,7 +166,8 @@ public class StreamsUpgradeTest {
                     userEndPoint(),
                     taskManager.getTaskOffsetSums(),
                     uniqueField,
-                    0
+                    0,
+                    CLIENT_TAGS
                 ).encode();
             } else {
                 return new FutureSubscriptionInfo(
@@ -264,7 +268,8 @@ public class StreamsUpgradeTest {
                                 info.userEndPoint(),
                                 taskManager().getTaskOffsetSums(),
                                 (byte) 0,
-                                0
+                                0,
+                                CLIENT_TAGS
                             ).encode(),
                             subscription.ownedPartitions()
                         ));
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 1507310..57c89aa 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -477,7 +477,7 @@ class StreamsUpgradeTest(Test):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    highest_version = 10
+                    highest_version = 11
                     version_probing_message = "Sent a version " + str(highest_version + 1) + " subscription and got version " + str(highest_version) + " assignment back (successful version probing). Downgrade subscription metadata to commonly supported version " + str(highest_version) + " and trigger new rebalance."
                     end_of_upgrade_message = "Sent a version " + str(highest_version) + " subscription and group.s latest commonly supported version is " + str(highest_version + 1) + " (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to " + str(highest_version + 1) + " for next rebalance."
                     end_of_upgrade_error_message = "Could not detect 'successful version probing and end of rolling upgrade' at upgraded node "