You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/19 19:49:21 UTC
[kafka] branch 2.6 updated: KAFKA-10455: Ensure that probing
rebalances always occur (#9383)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 80e6011 KAFKA-10455: Ensure that probing rebalances always occur (#9383)
80e6011 is described below
commit 80e601184e31e9e95c238b7e1e73b3522d92f392
Author: leah <lt...@confluent.io>
AuthorDate: Mon Oct 19 13:29:35 2020 -0500
KAFKA-10455: Ensure that probing rebalances always occur (#9383)
Add dummy data to subscriptionUserData to make sure that
it is different each time a member rejoins.
Reviewers: A. Sophie Blee-Goldman <ab...@apache.org>, John Roesler <vv...@apache.org>
---
.../internals/StreamsPartitionAssignor.java | 12 ++++-
.../internals/assignment/AssignmentInfo.java | 2 +
.../StreamsAssignmentProtocolVersions.java | 2 +-
.../internals/assignment/SubscriptionInfo.java | 6 ++-
.../common/message/SubscriptionInfoData.json | 7 ++-
...ighAvailabilityTaskAssignorIntegrationTest.java | 4 +-
...ghAvailabilityStreamsPartitionAssignorTest.java | 2 +-
.../internals/StreamsPartitionAssignorTest.java | 56 ++++++++++++++++++++--
.../internals/assignment/AssignmentTestUtils.java | 2 +-
.../internals/assignment/SubscriptionInfoTest.java | 45 +++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 9 ++--
11 files changed, 118 insertions(+), 29 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 304ec30..ef4bca9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -182,6 +182,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private AssignmentListener assignmentListener;
private Supplier<TaskAssignor> taskAssignorSupplier;
+ private byte uniqueField;
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
@@ -212,6 +213,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
+ uniqueField = 0;
}
@Override
@@ -234,15 +236,18 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Map from task id to its overall lag
+ // 3. Unique Field to ensure a rebalance when a thread rejoins by forcing the user data to be different
handleRebalanceStart(topics);
+ uniqueField++;
return new SubscriptionInfo(
usedSubscriptionMetadataVersion,
LATEST_SUPPORTED_VERSION,
taskManager.processId(),
userEndPoint,
- taskManager.getTaskOffsetSums())
+ taskManager.getTaskOffsetSums(),
+ uniqueField)
.encode();
}
@@ -1398,6 +1403,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
break;
case 7:
+ case 8:
validateActiveTaskEncoding(partitions, info, logPrefix);
activeTasks = getActiveTasks(partitions, info);
@@ -1555,6 +1561,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
return taskManager;
}
+ protected byte uniqueField() {
+ return uniqueField;
+ }
+
protected void handleRebalanceStart(final Set<String> topics) {
taskManager.handleRebalanceStart(topics);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index aa2d400..7183eb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -179,6 +179,7 @@ public class AssignmentInfo {
out.writeInt(errCode);
break;
case 7:
+ case 8:
out.writeInt(usedVersion);
out.writeInt(commonlySupportedVersion);
encodeActiveAndStandbyTaskAssignment(out);
@@ -352,6 +353,7 @@ public class AssignmentInfo {
assignmentInfo.errCode = in.readInt();
break;
case 7:
+ case 8:
commonlySupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, commonlySupportedVersion);
decodeActiveTasks(assignmentInfo, in);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
index 51d70a8..aa33406 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StreamsAssignmentProtocolVersions.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
public final class StreamsAssignmentProtocolVersions {
public static final int UNKNOWN = -1;
public static final int EARLIEST_PROBEABLE_VERSION = 3;
- public static final int LATEST_SUPPORTED_VERSION = 7;
+ public static final int LATEST_SUPPORTED_VERSION = 8;
private StreamsAssignmentProtocolVersions() {}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 49496cf..0129853 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -81,7 +81,8 @@ public class SubscriptionInfo {
final int latestSupportedVersion,
final UUID processId,
final String userEndPoint,
- final Map<TaskId, Long> taskOffsetSums) {
+ final Map<TaskId, Long> taskOffsetSums,
+ final byte uniqueField) {
validateVersions(version, latestSupportedVersion);
final SubscriptionInfoData data = new SubscriptionInfoData();
data.setVersion(version);
@@ -95,6 +96,9 @@ public class SubscriptionInfo {
if (version >= 3) {
data.setLatestSupportedVersion(latestSupportedVersion);
}
+ if (version >= 8) {
+ data.setUniqueField(uniqueField);
+ }
this.data = data;
diff --git a/streams/src/main/resources/common/message/SubscriptionInfoData.json b/streams/src/main/resources/common/message/SubscriptionInfoData.json
index af0d2f8..f80ea54 100644
--- a/streams/src/main/resources/common/message/SubscriptionInfoData.json
+++ b/streams/src/main/resources/common/message/SubscriptionInfoData.json
@@ -15,7 +15,7 @@
{
"name": "SubscriptionInfoData",
- "validVersions": "1-7",
+ "validVersions": "1-8",
"fields": [
{
"name": "version",
@@ -52,6 +52,11 @@
"name": "taskOffsetSums",
"versions": "7+",
"type": "[]TaskOffsetSum"
+ },
+ {
+ "name": "uniqueField",
+ "versions": "8+",
+ "type": "int8"
}
],
"commonStructs": [
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 7e7cf29..530a854 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -286,7 +286,9 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "60000"),
mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
- mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName())
+ mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()),
+ // Increasing the number of threads to ensure that a rebalance happens each time a consumer sends a rejoin (KAFKA-10455)
+ mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)
)
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
index deccfa4..35940de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
@@ -322,7 +322,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
private static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks));
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks), (byte) 0);
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 6d68498..b75d278 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -186,6 +186,7 @@ public class StreamsPartitionAssignorTest {
private final AtomicInteger assignmentError = new AtomicInteger();
private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private final MockTime time = new MockTime();
+ private final byte uniqueField = 1;
private Map<String, Object> configProps() {
final Map<String, Object> configurationMap = new HashMap<>();
@@ -509,7 +510,7 @@ public class StreamsPartitionAssignorTest {
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
- final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
+ final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
}
@@ -535,7 +536,7 @@ public class StreamsPartitionAssignorTest {
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
- final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);
+ final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks, uniqueField);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
}
@@ -1958,6 +1959,34 @@ public class StreamsPartitionAssignorTest {
EasyMock.verify(consumerClient);
}
+ @Test
+ public void testUniqueField() {
+ createDefaultMockTaskManager();
+ configureDefaultPartitionAssignor();
+ final Set<String> topics = mkSet("input");
+
+ assertEquals(0, partitionAssignor.uniqueField());
+ partitionAssignor.subscriptionUserData(topics);
+ assertEquals(1, partitionAssignor.uniqueField());
+ partitionAssignor.subscriptionUserData(topics);
+ assertEquals(2, partitionAssignor.uniqueField());
+
+ }
+
+ @Test
+ public void testUniqueFieldOverflow() {
+ createDefaultMockTaskManager();
+ configureDefaultPartitionAssignor();
+ final Set<String> topics = mkSet("input");
+
+ for (int i = 0; i < 127; i++) {
+ partitionAssignor.subscriptionUserData(topics);
+ }
+ assertEquals(127, partitionAssignor.uniqueField());
+ partitionAssignor.subscriptionUserData(topics);
+ assertEquals(-128, partitionAssignor.uniqueField());
+ }
+
private static ByteBuffer encodeFutureSubscription() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
buf.putInt(LATEST_SUPPORTED_VERSION + 1);
@@ -2073,7 +2102,15 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks));
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
+ }
+
+ private static SubscriptionInfo getInfo(final UUID processId,
+ final Set<TaskId> prevTasks,
+ final Set<TaskId> standbyTasks,
+ final byte uniqueField) {
+ return new SubscriptionInfo(
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField);
}
private static SubscriptionInfo getInfo(final UUID processId,
@@ -2081,7 +2118,16 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> standbyTasks,
final String userEndPoint) {
return new SubscriptionInfo(
- LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks));
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
+ }
+
+ private static SubscriptionInfo getInfo(final UUID processId,
+ final Set<TaskId> prevTasks,
+ final Set<TaskId> standbyTasks,
+ final String userEndPoint,
+ final byte uniqueField) {
+ return new SubscriptionInfo(
+ LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, userEndPoint, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField);
}
private static SubscriptionInfo getInfoForOlderVersion(final int version,
@@ -2089,7 +2135,7 @@ public class StreamsPartitionAssignorTest {
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks) {
return new SubscriptionInfo(
- version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks));
+ version, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), (byte) 0);
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
index f0139ff..2bba47b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
@@ -86,7 +86,7 @@ public final class AssignmentTestUtils {
}
return clientStates;
}
-
+
/**
* Builds a UUID by repeating the given number n. For valid n, it is guaranteed that the returned UUIDs satisfy
* the same relation relative to others as their parameter n does: iff n < m, then uuidForInt(n) < uuidForInt(m)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index b76ce59..981118e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -59,6 +59,7 @@ public class SubscriptionInfoTest {
);
private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
+ private static final byte IGNORED_UNIQUE_FIELD = (byte) 0;
@Test(expected = IllegalArgumentException.class)
public void shouldThrowForUnknownVersion1() {
@@ -67,7 +68,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
}
@@ -78,7 +80,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
}
@@ -89,7 +92,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
IGNORED_USER_ENDPOINT,
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(1, decoded.version());
@@ -107,7 +111,8 @@ public class SubscriptionInfoTest {
1234,
UUID_1,
"ignoreme",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final ByteBuffer buffer = info.encode();
@@ -148,7 +153,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(2, decoded.version());
@@ -166,7 +172,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final ByteBuffer buffer = info.encode();
@@ -208,7 +215,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
assertEquals(version, decoded.version());
@@ -228,7 +236,8 @@ public class SubscriptionInfoTest {
LATEST_SUPPORTED_VERSION,
UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD
);
final ByteBuffer buffer = info.encode();
@@ -269,7 +278,7 @@ public class SubscriptionInfoTest {
@Test
public void shouldEncodeAndDecodeVersion5() {
final SubscriptionInfo info =
- new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(5, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
assertEquals(info, SubscriptionInfo.decode(info.encode()));
}
@@ -286,23 +295,23 @@ public class SubscriptionInfoTest {
final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
final SubscriptionInfo info =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
final SubscriptionInfo expectedInfo =
- new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion7() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
assertThat(info, is(SubscriptionInfo.decode(info.encode())));
}
@Test
public void shouldConvertTaskOffsetSumMapToTaskSets() {
final SubscriptionInfo info =
- new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS);
+ new SubscriptionInfo(7, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
assertThat(info.prevTasks(), is(ACTIVE_TASKS));
assertThat(info.standbyTasks(), is(STANDBY_TASKS));
}
@@ -313,7 +322,8 @@ public class SubscriptionInfoTest {
new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
LATEST_SUPPORTED_VERSION, UUID_1,
"localhost:80",
- TASK_OFFSET_SUMS)
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD)
.encode());
assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
}
@@ -341,6 +351,13 @@ public class SubscriptionInfoTest {
assertThat(info.taskOffsetSums(), is(expectedOffsetSumsMap));
}
+ @Test
+ public void shouldEncodeAndDecodeVersion8() {
+ final SubscriptionInfo info =
+ new SubscriptionInfo(8, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
+ assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+ }
+
private static ByteBuffer encodeFutureVersion() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+ 4 /* supported version */);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index af4d69c..d18657d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -151,14 +151,16 @@ public class StreamsUpgradeTest {
// 3. Task ids of valid local states on the client's state directory.
final TaskManager taskManager = taskManager();
handleRebalanceStart(topics);
-
+ byte uniqueField = 0;
if (usedSubscriptionMetadataVersion <= LATEST_SUPPORTED_VERSION) {
+ uniqueField++;
return new SubscriptionInfo(
usedSubscriptionMetadataVersion,
LATEST_SUPPORTED_VERSION + 1,
taskManager.processId(),
userEndPoint(),
- taskManager.getTaskOffsetSums()
+ taskManager.getTaskOffsetSums(),
+ uniqueField
).encode();
} else {
return new FutureSubscriptionInfo(
@@ -257,7 +259,8 @@ public class StreamsUpgradeTest {
LATEST_SUPPORTED_VERSION,
info.processId(),
info.userEndPoint(),
- taskManager().getTaskOffsetSums())
+ taskManager().getTaskOffsetSums(),
+ (byte) 0)
.encode(),
subscription.ownedPartitions()
));