You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/11 08:30:47 UTC
[kafka] branch trunk updated: KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87eb0cf KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)
87eb0cf is described below
commit 87eb0cf03c879fe77e6e72b490fcf0b9621dd559
Author: Levani Kokhreidze <le...@transferwise.com>
AuthorDate: Fri Mar 11 10:29:05 2022 +0200
KAFKA-6718: Update SubscriptionInfoData with clientTags (#10802)
adds ClientTags to SubscriptionInfoData
Reviewer: Luke Chen <sh...@gmail.com>, Bruno Cadonna <ca...@apache.org>
---
.../internals/StreamsPartitionAssignor.java | 4 +-
.../internals/assignment/AssignmentInfo.java | 2 +
.../StreamsAssignmentProtocolVersions.java | 2 +-
.../internals/assignment/SubscriptionInfo.java | 28 ++++++-
.../common/message/SubscriptionInfoData.json | 23 +++++-
...ghAvailabilityStreamsPartitionAssignorTest.java | 2 +-
.../internals/StreamsPartitionAssignorTest.java | 3 +-
.../internals/assignment/AssignmentTestUtils.java | 6 +-
.../internals/assignment/SubscriptionInfoTest.java | 85 ++++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 9 ++-
.../tests/streams/streams_upgrade_test.py | 2 +-
11 files changed, 132 insertions(+), 34 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 03f826a..d2fa905 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -264,7 +264,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
userEndPoint,
taskOffsetSums,
uniqueField,
- assignmentErrorCode.get()
+ assignmentErrorCode.get(),
+ Collections.emptyMap()
).encode();
}
@@ -1307,6 +1308,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
case 8:
case 9:
case 10:
+ case 11:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index ecf0b25..350426e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -185,6 +185,7 @@ public class AssignmentInfo {
case 8:
case 9:
case 10:
+ case 11:
out.writeInt(usedVersion);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
@@ -361,6 +362,7 @@ public class AssignmentInfo {
case 8:
case 9:
case 10:
+ case 11:
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeActiveTasks(assignmentInfo, in);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
index 9bd7142..47169ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
@@ -20,7 +20,7 @@ public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
public static final int MIN_NAMED_TOPOLOGY_VERSION = 10;
- public static final int LATEST_SUPPORTED_VERSION = 10;
+ public static final int LATEST_SUPPORTED_VERSION = 11;
/*
* Any time you modify the subscription or assignment info, you need to bump the latest supported version, unless
* the version has already been bumped within the current release cycle.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 58d2dbe..71168c7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData;
+import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.ClientTag;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.PartitionToOffsetSum;
import org.apache.kafka.streams.internals.generated.SubscriptionInfoData.TaskOffsetSum;
import org.apache.kafka.streams.processor.TaskId;
@@ -87,7 +88,8 @@ public class SubscriptionInfo {
final String userEndPoint,
final Map<TaskId, Long> taskOffsetSums,
final byte uniqueField,
- final int errorCode) {
+ final int errorCode,
+ final Map<String, String> clientTags) {
validateVersions(version, latestSupportedVersion);
final SubscriptionInfoData data = new SubscriptionInfoData();
data.setVersion(version);
@@ -108,6 +110,9 @@ public class SubscriptionInfo {
if (version >= 9) {
data.setErrorCode(errorCode);
}
+ if (version >= 11) {
+ data.setClientTags(buildClientTagsFromMap(clientTags));
+ }
this.data = data;
@@ -125,10 +130,31 @@ public class SubscriptionInfo {
this.data = subscriptionInfoData;
}
+ public Map<String, String> clientTags() {
+ return data.clientTags().stream()
+ .collect(
+ Collectors.toMap(
+ clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8),
+ clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8)
+ )
+ );
+ }
+
public int errorCode() {
return data.errorCode();
}
+ private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) {
+ return clientTags.entrySet().stream()
+ .map(clientTagEntry -> {
+ final ClientTag clientTag = new ClientTag();
+ clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
+ clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
+ return clientTag;
+ })
+ .collect(Collectors.toList());
+ }
+
// For version > MIN_NAMED_TOPOLOGY_VERSION
private void setTaskOffsetSumDataWithNamedTopologiesFromTaskOffsetSumMap(final Map<TaskId, Long> taskOffsetSums) {
data.setTaskOffsetSums(taskOffsetSums.entrySet().stream().map(t -> {
diff --git a/streams/src/main/resources/common/message/SubscriptionInfoData.json b/streams/src/main/resources/common/message/SubscriptionInfoData.json
index f9a830e..6304478 100644
--- a/streams/src/main/resources/common/message/SubscriptionInfoData.json
+++ b/streams/src/main/resources/common/message/SubscriptionInfoData.json
@@ -15,7 +15,7 @@
{
"name": "SubscriptionInfoData",
- "validVersions": "1-10",
+ "validVersions": "1-11",
"flexibleVersions": "none",
"fields": [
{
@@ -65,6 +65,11 @@
"name": "errorCode",
"versions": "9+",
"type": "int32"
+ },
+ {
+ "name": "clientTags",
+ "versions": "11+",
+ "type": "[]ClientTag"
}
],
"commonStructs": [
@@ -136,6 +141,22 @@
"type": "int64"
}
]
+ },
+ {
+ "name": "ClientTag",
+ "versions": "11+",
+ "fields": [
+ {
+ "name": "key",
+ "versions": "11+",
+ "type": "bytes"
+ },
+ {
+ "name": "value",
+ "versions": "11+",
+ "type": "bytes"
+ }
+ ]
}
],
"type": "data"
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index 9c2d119..e67793d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -328,7 +328,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
private static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0);
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0, 0, Collections.emptyMap());
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 23e8d20..d11f3e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -105,6 +105,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@@ -2332,7 +2333,7 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
- version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
+ version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index 627114a..cf8a6b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -147,7 +147,7 @@ public final class AssignmentTestUtils {
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}
public static SubscriptionInfo getInfo(final UUID processId,
@@ -155,7 +155,7 @@ public final class AssignmentTestUtils {
final Set<TaskId> standbyTasks,
final String userEndPoint) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0);
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0, 0, EMPTY_CLIENT_TAGS);
}
public static SubscriptionInfo getInfo(final UUID processId,
@@ -163,7 +163,7 @@ public final class AssignmentTestUtils {
final Set<TaskId> standbyTasks,
final byte uniqueField) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0);
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index dd65196..b635a50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -30,6 +30,7 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_1_0;
@@ -50,6 +51,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Subscripti
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anEmptyMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@@ -81,12 +83,12 @@ public class SubscriptionInfoTest {
mkEntry(NAMED_TASK_T2_0_0, 10L),
mkEntry(NAMED_TASK_T2_2_0, 5L)
);
+ private static final Map<String, String> CLIENT_TAGS = mkMap(mkEntry("t1", "v1"), mkEntry("t2", "v2"));
private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
private static final byte IGNORED_UNIQUE_FIELD = (byte) 0;
private static final int IGNORED_ERROR_CODE = 0;
-
@Test
public void shouldThrowForUnknownVersion1() {
assertThrows(IllegalArgumentException.class, () -> new SubscriptionInfo(
@@ -96,7 +98,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
));
}
@@ -109,7 +112,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
));
}
@@ -122,7 +126,8 @@ public class SubscriptionInfoTest {
IGNORED_USER_ENDPOINT,
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(1, decoded.version());
@@ -142,7 +147,8 @@ public class SubscriptionInfoTest {
"ignoreme",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final ByteBuffer buffer = info.encode();
@@ -185,7 +191,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(2, decoded.version());
@@ -205,7 +212,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final ByteBuffer buffer = info.encode();
@@ -249,7 +257,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(version, decoded.version());
@@ -271,7 +280,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS
);
final ByteBuffer buffer = info.encode();
@@ -312,7 +322,7 @@ public class SubscriptionInfoTest {
@Test
public void shouldEncodeAndDecodeVersion5() {
final SubscriptionInfo info =
- new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertEquals(info, SubscriptionInfo.decode(info.encode()));
}
@@ -329,23 +339,23 @@ public class SubscriptionInfoTest {
final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final SubscriptionInfo info =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
final SubscriptionInfo expectedInfo =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion7() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldConvertTaskOffsetSumMapToTaskSets() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info.prevTasks(), is(ACTIVE_TASKS));
assertThat(info.standbyTasks(), is(STANDBY_TASKS));
}
@@ -358,8 +368,8 @@ public class SubscriptionInfoTest {
"localhost:80",
TASK_OFFSET_SUMS,
IGNORED_UNIQUE_FIELD,
- IGNORED_ERROR_CODE
- ).encode());
+ IGNORED_ERROR_CODE,
+ EMPTY_CLIENT_TAGS).encode());
assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
}
@@ -389,14 +399,14 @@ public class SubscriptionInfoTest {
@Test
public void shouldEncodeAndDecodeVersion8() {
final SubscriptionInfo info =
- new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldNotErrorAccessingFutureVars() {
final SubscriptionInfo info =
- new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
try {
info.errorCode();
} catch (final Exception e) {
@@ -407,21 +417,21 @@ public class SubscriptionInfoTest {
@Test
public void shouldEncodeAndDecodeVersion9() {
final SubscriptionInfo info =
- new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldEncodeAndDecodeVersion10() {
final SubscriptionInfo info =
- new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
final SubscriptionInfo info =
- new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+ new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@@ -429,10 +439,41 @@ public class SubscriptionInfoTest {
public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
assertThrows(
TaskAssignmentException.class,
- () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+ () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS)
);
}
+ @Test
+ public void shouldEncodeAndDecodeVersion11() {
+ final SubscriptionInfo info =
+ new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+ assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+ }
+
+ @Test
+ public void shouldReturnEncodeDecodeEmptyClientTagsOnVersion11() {
+ final SubscriptionInfo info =
+ new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, EMPTY_CLIENT_TAGS);
+ assertThat(info.clientTags(), is(anEmptyMap()));
+ assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+ }
+
+ @Test
+ public void shouldReturnEmptyMapOfClientTagsOnOlderVersions() {
+ final SubscriptionInfo info =
+ new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+
+ assertThat(info.clientTags(), is(anEmptyMap()));
+ }
+
+ @Test
+ public void shouldReturnMapOfClientTagsOnVersion11() {
+ final SubscriptionInfo info =
+ new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, CLIENT_TAGS);
+
+ assertThat(info.clientTags(), is(CLIENT_TAGS));
+ }
+
private static ByteBuffer encodeFutureVersion() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+ 4 /* supported version */);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 94f623e..3f255be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -62,6 +62,8 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
public class StreamsUpgradeTest {
@@ -121,6 +123,7 @@ public class StreamsUpgradeTest {
}
public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
+ private static final Map<String, String> CLIENT_TAGS = mkMap(mkEntry("t1", "v1"), mkEntry("t2", "v2"));
private final Logger log = LoggerFactory.getLogger(FutureStreamsPartitionAssignor.class);
private AtomicInteger usedSubscriptionMetadataVersionPeek;
@@ -163,7 +166,8 @@ public class StreamsUpgradeTest {
userEndPoint(),
taskManager.getTaskOffsetSums(),
uniqueField,
- 0
+ 0,
+ CLIENT_TAGS
).encode();
} else {
return new FutureSubscriptionInfo(
@@ -264,7 +268,8 @@ public class StreamsUpgradeTest {
info.userEndPoint(),
taskManager().getTaskOffsetSums(),
(byte) 0,
- 0
+ 0,
+ CLIENT_TAGS
).encode(),
subscription.ownedPartitions()
));
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 1507310..57c89aa 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -477,7 +477,7 @@ class StreamsUpgradeTest(Test):
monitors[first_other_processor] = first_other_monitor
monitors[second_other_processor] = second_other_monitor
- highest_version = 10
+ highest_version = 11
version_probing_message = "Sent a version " + str(highest_version + 1) + " subscription and got version " + str(highest_version) + " assignment back (successful version probing). Downgrade subscription metadata to commonly supported version " + str(highest_version) + " and trigger new rebalance."
end_of_upgrade_message = "Sent a version " + str(highest_version) + " subscription and group.s latest commonly supported version is " + str(highest_version + 1) + " (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to " + str(highest_version + 1) + " for next rebalance."
end_of_upgrade_error_message = "Could not detect 'successful version probing and end of rolling upgrade' at upgraded node "