You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/04/18 07:38:36 UTC
[kafka] branch trunk updated: KAFKA-6054: Update Kafka Streams
metadata to version 3 (#4880)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cae4221 KAFKA-6054: Update Kafka Streams metadata to version 3 (#4880)
cae4221 is described below
commit cae42215b7597cc39afc682cb0399782cf42fe23
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Apr 18 09:38:27 2018 +0200
KAFKA-6054: Update Kafka Streams metadata to version 3 (#4880)
- adds Streams upgrade tests for 1.1 release
- introduces metadata version 3
Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
build.gradle | 12 ++
gradle/dependencies.gradle | 2 +
settings.gradle | 5 +-
.../org/apache/kafka/streams/StreamsConfig.java | 34 +++-
.../internals/StreamsPartitionAssignor.java | 36 +++-
.../internals/assignment/AssignmentInfo.java | 110 +++++++++---
.../internals/assignment/SubscriptionInfo.java | 184 ++++++++++++++++-----
.../internals/StreamsPartitionAssignorTest.java | 80 +++++++--
.../internals/assignment/AssignmentInfoTest.java | 105 +++++-------
.../internals/assignment/SubscriptionInfoTest.java | 91 ++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 104 ++++++++++++
tests/kafkatest/services/streams.py | 1 +
.../tests/streams/streams_upgrade_test.py | 32 ++--
tests/kafkatest/version.py | 6 +-
14 files changed, 586 insertions(+), 216 deletions(-)
diff --git a/build.gradle b/build.gradle
index 69f560e..8b07a95 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') {
}
}
+project(':streams:upgrade-system-tests-11') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-11"
+
+ dependencies {
+ testCompile libs.kafkaStreams_11
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index d6beba9..e5f2958 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -67,6 +67,7 @@ versions += [
kafka_0102: "0.10.2.1",
kafka_0110: "0.11.0.2",
kafka_10: "1.0.1",
+ kafka_11: "1.1.0",
lz4: "1.4.1",
metrics: "2.2.0",
// PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
@@ -115,6 +116,7 @@ libs += [
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
+ kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index 0313684..2a7977c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -15,5 +15,6 @@
include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples',
'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
- 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender',
- 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
+ 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
+ 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
+ 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 819bebd..65b1da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -173,6 +173,31 @@ public class StreamsConfig extends AbstractConfig {
public static final String UPGRADE_FROM_0100 = "0.10.0";
/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
+ */
+ public static final String UPGRADE_FROM_0101 = "0.10.1";
+
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
+ */
+ public static final String UPGRADE_FROM_0102 = "0.10.2";
+
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
+ */
+ public static final String UPGRADE_FROM_0110 = "0.11.0";
+
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
+ */
+ public static final String UPGRADE_FROM_10 = "1.0";
+
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
+ */
+ public static final String UPGRADE_FROM_11 = "1.1";
+
+ /**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
public static final String AT_LEAST_ONCE = "at_least_once";
@@ -347,8 +372,9 @@ public class StreamsConfig extends AbstractConfig {
/** {@code upgrade.from} */
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
- public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
- "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+ public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
+ "When upgrading from 1.2 to a newer version it is not required to specify this config." +
+ "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";
/**
* {@code value.serde}
@@ -364,7 +390,7 @@ public class StreamsConfig extends AbstractConfig {
/**
* {@code zookeeper.connect}
- * @deprecated Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
+ * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored.
*/
@Deprecated
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
@@ -575,7 +601,7 @@ public class StreamsConfig extends AbstractConfig {
.define(UPGRADE_FROM_CONFIG,
ConfigDef.Type.STRING,
null,
- in(null, UPGRADE_FROM_0100),
+ in(null, UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 97771e5..c81105e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -199,10 +199,24 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
- final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
- if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
- log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
- userMetadataVersion = 1;
+ final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (upgradeFrom != null) {
+ switch (upgradeFrom) {
+ case StreamsConfig.UPGRADE_FROM_0100:
+ log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ userMetadataVersion = 1;
+ break;
+ case StreamsConfig.UPGRADE_FROM_0101:
+ case StreamsConfig.UPGRADE_FROM_0102:
+ case StreamsConfig.UPGRADE_FROM_0110:
+ case StreamsConfig.UPGRADE_FROM_10:
+ case StreamsConfig.UPGRADE_FROM_11:
+ log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ userMetadataVersion = 2;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
+ }
}
final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
@@ -512,7 +526,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
- if (minUserMetadataVersion == 2) {
+ if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
@@ -631,6 +645,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
+ case 3:
+ processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+ partitionsByHost = info.partitionsByHost();
+ break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
@@ -684,6 +702,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
}
+ private void processVersionThreeAssignment(final AssignmentInfo info,
+ final List<TopicPartition> partitions,
+ final Map<TaskId, Set<TopicPartition>> activeTasks,
+ final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+ processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+ }
+
/**
* Internal helper function that creates a Kafka topic
*
@@ -818,4 +843,5 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index c8df749..3c5cee2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -16,8 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
-import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.HostInfo;
@@ -30,6 +30,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -40,15 +41,20 @@ public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
- public static final int LATEST_SUPPORTED_VERSION = 2;
+ public static final int LATEST_SUPPORTED_VERSION = 3;
+ public static final int UNKNOWN = -1;
private final int usedVersion;
+ private final int latestSupportedVersion;
private List<TaskId> activeTasks;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
- private AssignmentInfo(final int version) {
+ // used for decoding; don't apply version checks
+ private AssignmentInfo(final int version,
+ final int latestSupportedVersion) {
this.usedVersion = version;
+ this.latestSupportedVersion = latestSupportedVersion;
}
public AssignmentInfo(final List<TaskId> activeTasks,
@@ -57,11 +63,33 @@ public class AssignmentInfo {
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
}
+ public AssignmentInfo() {
+ this(LATEST_SUPPORTED_VERSION,
+ Collections.<TaskId>emptyList(),
+ Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+ Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+ }
+
public AssignmentInfo(final int version,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
+ this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
+
+ if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+ throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+ + "; was: " + version);
+ }
+ }
+
+ // for testing only; don't apply version checks
+ AssignmentInfo(final int version,
+ final int latestSupportedVersion,
+ final List<TaskId> activeTasks,
+ final Map<TaskId, Set<TopicPartition>> standbyTasks,
+ final Map<HostInfo, Set<TopicPartition>> hostState) {
this.usedVersion = version;
+ this.latestSupportedVersion = latestSupportedVersion;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.partitionsByHost = hostState;
@@ -71,6 +99,10 @@ public class AssignmentInfo {
return usedVersion;
}
+ public int latestSupportedVersion() {
+ return latestSupportedVersion;
+ }
+
public List<TaskId> activeTasks() {
return activeTasks;
}
@@ -98,6 +130,9 @@ public class AssignmentInfo {
case 2:
encodeVersionTwo(out);
break;
+ case 3:
+ encodeVersionThree(out);
+ break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@@ -161,6 +196,13 @@ public class AssignmentInfo {
}
}
+ private void encodeVersionThree(final DataOutputStream out) throws IOException {
+ out.writeInt(3);
+ out.writeInt(LATEST_SUPPORTED_VERSION);
+ encodeActiveAndStandbyTaskAssignment(out);
+ encodePartitionsByHost(out);
+ }
+
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
@@ -169,19 +211,25 @@ public class AssignmentInfo {
data.rewind();
try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
- // decode used version
- final int usedVersion = in.readInt();
- final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);
+ final AssignmentInfo assignmentInfo;
+ final int usedVersion = in.readInt();
switch (usedVersion) {
case 1:
+ assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
decodeVersionOneData(assignmentInfo, in);
break;
case 2:
+ assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN);
decodeVersionTwoData(assignmentInfo, in);
break;
+ case 3:
+ final int latestSupportedVersion = in.readInt();
+ assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
+ decodeVersionThreeData(assignmentInfo, in);
+ break;
default:
- TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+ TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
"used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
@@ -195,15 +243,23 @@ public class AssignmentInfo {
private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
- // decode active tasks
- int count = in.readInt();
+ decodeActiveTasks(assignmentInfo, in);
+ decodeStandbyTasks(assignmentInfo, in);
+ assignmentInfo.partitionsByHost = new HashMap<>();
+ }
+
+ private static void decodeActiveTasks(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
+ final int count = in.readInt();
assignmentInfo.activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
assignmentInfo.activeTasks.add(TaskId.readFrom(in));
}
+ }
- // decode standby tasks
- count = in.readInt();
+ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
+ final int count = in.readInt();
assignmentInfo.standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
@@ -213,9 +269,13 @@ public class AssignmentInfo {
private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
- decodeVersionOneData(assignmentInfo, in);
+ decodeActiveTasks(assignmentInfo, in);
+ decodeStandbyTasks(assignmentInfo, in);
+ decodeGlobalAssignmentData(assignmentInfo, in);
+ }
- // decode partitions by host
+ private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
assignmentInfo.partitionsByHost = new HashMap<>();
final int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
@@ -233,19 +293,27 @@ public class AssignmentInfo {
return partitions;
}
+ private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
+ decodeActiveTasks(assignmentInfo, in);
+ decodeStandbyTasks(assignmentInfo, in);
+ decodeGlobalAssignmentData(assignmentInfo, in);
+ }
+
@Override
public int hashCode() {
- return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+ return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}
@Override
public boolean equals(final Object o) {
if (o instanceof AssignmentInfo) {
final AssignmentInfo other = (AssignmentInfo) o;
- return this.usedVersion == other.usedVersion &&
- this.activeTasks.equals(other.activeTasks) &&
- this.standbyTasks.equals(other.standbyTasks) &&
- this.partitionsByHost.equals(other.partitionsByHost);
+ return usedVersion == other.usedVersion &&
+ latestSupportedVersion == other.latestSupportedVersion &&
+ activeTasks.equals(other.activeTasks) &&
+ standbyTasks.equals(other.standbyTasks) &&
+ partitionsByHost.equals(other.partitionsByHost);
} else {
return false;
}
@@ -253,7 +321,11 @@ public class AssignmentInfo {
@Override
public String toString() {
- return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
+ return "[version=" + usedVersion
+ + ", supported version=" + latestSupportedVersion
+ + ", active tasks=" + activeTasks
+ + ", standby tasks=" + standbyTasks
+ + ", global assignment=" + partitionsByHost + "]";
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index 7fee90b..be70947 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -31,16 +32,21 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
- public static final int LATEST_SUPPORTED_VERSION = 2;
+ public static final int LATEST_SUPPORTED_VERSION = 3;
+ public static final int UNKNOWN = -1;
private final int usedVersion;
+ private final int latestSupportedVersion;
private UUID processId;
private Set<TaskId> prevTasks;
private Set<TaskId> standbyTasks;
private String userEndPoint;
- private SubscriptionInfo(final int version) {
+ // used for decoding; don't apply version checks
+ private SubscriptionInfo(final int version,
+ final int latestSupportedVersion) {
this.usedVersion = version;
+ this.latestSupportedVersion = latestSupportedVersion;
}
public SubscriptionInfo(final UUID processId,
@@ -55,7 +61,23 @@ public class SubscriptionInfo {
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final String userEndPoint) {
+ this(version, LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+
+ if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
+ throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+ + "; was: " + version);
+ }
+ }
+
+ // for testing only; don't apply version checks
+ protected SubscriptionInfo(final int version,
+ final int latestSupportedVersion,
+ final UUID processId,
+ final Set<TaskId> prevTasks,
+ final Set<TaskId> standbyTasks,
+ final String userEndPoint) {
this.usedVersion = version;
+ this.latestSupportedVersion = latestSupportedVersion;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
@@ -66,6 +88,10 @@ public class SubscriptionInfo {
return usedVersion;
}
+ public int latestSupportedVersion() {
+ return latestSupportedVersion;
+ }
+
public UUID processId() {
return processId;
}
@@ -93,7 +119,10 @@ public class SubscriptionInfo {
buf = encodeVersionOne();
break;
case 2:
- buf = encodeVersionTwo(prepareUserEndPoint());
+ buf = encodeVersionTwo();
+ break;
+ case 3:
+ buf = encodeVersionThree();
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
@@ -108,7 +137,9 @@ public class SubscriptionInfo {
final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
buf.putInt(1); // version
- encodeVersionOneData(buf);
+ encodeClientUUID(buf);
+ encodeTasks(buf, prevTasks);
+ encodeTasks(buf, standbyTasks);
return buf;
}
@@ -120,18 +151,15 @@ public class SubscriptionInfo {
4 + standbyTasks.size() * 8; // length + standby tasks
}
- private void encodeVersionOneData(final ByteBuffer buf) {
- // encode client UUID
+ private void encodeClientUUID(final ByteBuffer buf) {
buf.putLong(processId.getMostSignificantBits());
buf.putLong(processId.getLeastSignificantBits());
- // encode ids of previously running tasks
- buf.putInt(prevTasks.size());
- for (TaskId id : prevTasks) {
- id.writeTo(buf);
- }
- // encode ids of cached tasks
- buf.putInt(standbyTasks.size());
- for (TaskId id : standbyTasks) {
+ }
+
+ private void encodeTasks(final ByteBuffer buf,
+ final Collection<TaskId> taskIds) {
+ buf.putInt(taskIds.size());
+ for (TaskId id : taskIds) {
id.writeTo(buf);
}
}
@@ -144,52 +172,87 @@ public class SubscriptionInfo {
}
}
- private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+ private ByteBuffer encodeVersionTwo() {
+ final byte[] endPointBytes = prepareUserEndPoint();
+
final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
buf.putInt(2); // version
- encodeVersionTwoData(buf, endPointBytes);
+ encodeClientUUID(buf);
+ encodeTasks(buf, prevTasks);
+ encodeTasks(buf, standbyTasks);
+ encodeUserEndPoint(buf, endPointBytes);
return buf;
}
private int getVersionTwoByteLength(final byte[] endPointBytes) {
- return getVersionOneByteLength() +
+ return 4 + // version
+ 16 + // client ID
+ 4 + prevTasks.size() * 8 + // length + prev tasks
+ 4 + standbyTasks.size() * 8 + // length + standby tasks
4 + endPointBytes.length; // length + userEndPoint
}
- private void encodeVersionTwoData(final ByteBuffer buf,
- final byte[] endPointBytes) {
- encodeVersionOneData(buf);
+ private void encodeUserEndPoint(final ByteBuffer buf,
+ final byte[] endPointBytes) {
if (endPointBytes != null) {
buf.putInt(endPointBytes.length);
buf.put(endPointBytes);
}
}
+ private ByteBuffer encodeVersionThree() {
+ final byte[] endPointBytes = prepareUserEndPoint();
+
+ final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+
+ buf.putInt(3); // used version
+ buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
+ encodeClientUUID(buf);
+ encodeTasks(buf, prevTasks);
+ encodeTasks(buf, standbyTasks);
+ encodeUserEndPoint(buf, endPointBytes);
+
+ return buf;
+ }
+
+ private int getVersionThreeByteLength(final byte[] endPointBytes) {
+ return 4 + // used version
+ 4 + // latest supported version version
+ 16 + // client ID
+ 4 + prevTasks.size() * 8 + // length + prev tasks
+ 4 + standbyTasks.size() * 8 + // length + standby tasks
+ 4 + endPointBytes.length; // length + userEndPoint
+ }
+
/**
* @throws TaskAssignmentException if method fails to decode the data
*/
public static SubscriptionInfo decode(final ByteBuffer data) {
+ final SubscriptionInfo subscriptionInfo;
+
// ensure we are at the beginning of the ByteBuffer
data.rewind();
- // decode used version
final int usedVersion = data.getInt();
- final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion);
-
switch (usedVersion) {
case 1:
+ subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
decodeVersionOneData(subscriptionInfo, data);
break;
case 2:
+ subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
decodeVersionTwoData(subscriptionInfo, data);
break;
+ case 3:
+ final int latestSupportedVersion = data.getInt();
+ subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
+ decodeVersionThreeData(subscriptionInfo, data);
+ break;
default:
- TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
- "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
- log.error(fatalException.getMessage(), fatalException);
- throw fatalException;
+ subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
+ log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
}
return subscriptionInfo;
@@ -197,30 +260,43 @@ public class SubscriptionInfo {
private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
- // decode client UUID
- subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+ decodeClientUUID(subscriptionInfo, data);
- // decode previously active tasks
- final int numPrevs = data.getInt();
subscriptionInfo.prevTasks = new HashSet<>();
- for (int i = 0; i < numPrevs; i++) {
- TaskId id = TaskId.readFrom(data);
- subscriptionInfo.prevTasks.add(id);
- }
+ decodeTasks(subscriptionInfo.prevTasks, data);
- // decode previously cached tasks
- final int numCached = data.getInt();
subscriptionInfo.standbyTasks = new HashSet<>();
- for (int i = 0; i < numCached; i++) {
- subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+ decodeTasks(subscriptionInfo.standbyTasks, data);
+ }
+
+ private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo,
+ final ByteBuffer data) {
+ subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+ }
+
+ private static void decodeTasks(final Collection<TaskId> taskIds,
+ final ByteBuffer data) {
+ final int numPrevs = data.getInt();
+ for (int i = 0; i < numPrevs; i++) {
+ taskIds.add(TaskId.readFrom(data));
}
}
private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
final ByteBuffer data) {
- decodeVersionOneData(subscriptionInfo, data);
+ decodeClientUUID(subscriptionInfo, data);
+
+ subscriptionInfo.prevTasks = new HashSet<>();
+ decodeTasks(subscriptionInfo.prevTasks, data);
- // decode user end point (can be null)
+ subscriptionInfo.standbyTasks = new HashSet<>();
+ decodeTasks(subscriptionInfo.standbyTasks, data);
+
+ decodeUserEndPoint(subscriptionInfo, data);
+ }
+
+ private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo,
+ final ByteBuffer data) {
int bytesLength = data.getInt();
if (bytesLength != 0) {
final byte[] bytes = new byte[bytesLength];
@@ -229,9 +305,21 @@ public class SubscriptionInfo {
}
}
- @Override
+ private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo,
+ final ByteBuffer data) {
+ decodeClientUUID(subscriptionInfo, data);
+
+ subscriptionInfo.prevTasks = new HashSet<>();
+ decodeTasks(subscriptionInfo.prevTasks, data);
+
+ subscriptionInfo.standbyTasks = new HashSet<>();
+ decodeTasks(subscriptionInfo.standbyTasks, data);
+
+ decodeUserEndPoint(subscriptionInfo, data);
+ }
+
public int hashCode() {
- final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+ final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
if (userEndPoint == null) {
return hashCode;
}
@@ -243,6 +331,7 @@ public class SubscriptionInfo {
if (o instanceof SubscriptionInfo) {
final SubscriptionInfo other = (SubscriptionInfo) o;
return this.usedVersion == other.usedVersion &&
+ this.latestSupportedVersion == other.latestSupportedVersion &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
@@ -252,4 +341,13 @@ public class SubscriptionInfo {
}
}
+ @Override
+ public String toString() {
+ return "[version=" + usedVersion
+ + ", supported version=" + latestSupportedVersion
+ + ", process ID=" + processId
+ + ", prev tasks=" + prevTasks
+ + ", standby tasks=" + standbyTasks
+ + ", user endpoint=" + userEndPoint + "]";
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index e9ed968..4e04b49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -46,7 +46,6 @@ import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
-import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -64,6 +63,7 @@ import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
public class StreamsPartitionAssignorTest {
@@ -867,9 +867,12 @@ public class StreamsPartitionAssignorTest {
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
- assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+ assertEquals(
+ Utils.mkSet(
+ new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
- new TopicPartition("topic1", 2)), topicPartitions);
+ new TopicPartition("topic1", 2)),
+ topicPartitions);
}
@Test
@@ -881,7 +884,7 @@ public class StreamsPartitionAssignorTest {
try {
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
- Assert.fail("expected to an exception due to invalid config");
+ fail("expected to an exception due to invalid config");
} catch (ConfigException e) {
// pass
}
@@ -893,7 +896,7 @@ public class StreamsPartitionAssignorTest {
try {
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
- Assert.fail("expected to an exception due to invalid config");
+ fail("expected to an exception due to invalid config");
} catch (ConfigException e) {
// pass
}
@@ -1088,21 +1091,36 @@ public class StreamsPartitionAssignorTest {
}
@Test
- public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
+ shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2);
+ }
+
+ @Test
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
+ shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3);
+ }
+
+ @Test
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
+ shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3);
+ }
+
+ private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(final int smallestVersion,
+ final int otherVersion) {
final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
final Set<TaskId> emptyTasks = Collections.emptySet();
subscriptions.put(
"consumer1",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
- new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ new SubscriptionInfo(smallestVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
subscriptions.put(
"consumer2",
new PartitionAssignor.Subscription(
Collections.singletonList("topic1"),
- new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ new SubscriptionInfo(otherVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
)
);
@@ -1115,12 +1133,12 @@ public class StreamsPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
assertThat(assignment.size(), equalTo(2));
- assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1));
- assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
+ assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(smallestVersion));
+ assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion));
}
@Test
- public void shouldDownGradeSubscription() {
+ public void shouldDownGradeSubscriptionToVersion1() {
final Set<TaskId> emptyTasks = Collections.emptySet();
mockTaskManager(
@@ -1135,6 +1153,46 @@ public class StreamsPartitionAssignorTest {
assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
}
+ @Test
+ public void shouldDownGradeSubscriptionToVersion2For0101() {
+ shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101);
+ }
+
+ @Test
+ public void shouldDownGradeSubscriptionToVersion2For0102() {
+ shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102);
+ }
+
+ @Test
+ public void shouldDownGradeSubscriptionToVersion2For0110() {
+ shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110);
+ }
+
+ @Test
+ public void shouldDownGradeSubscriptionToVersion2For10() {
+ shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10);
+ }
+
+ @Test
+ public void shouldDownGradeSubscriptionToVersion2For11() {
+ shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11);
+ }
+
+ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue) {
+ final Set<TaskId> emptyTasks = Collections.emptySet();
+
+ mockTaskManager(
+ emptyTasks,
+ emptyTasks,
+ UUID.randomUUID(),
+ builder);
+ configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue));
+
+ PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+ assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
+ }
+
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
Collections.<TaskId, Set<TopicPartition>>emptyMap(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index c1020a9..c7382e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -22,85 +22,70 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.HostInfo;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class AssignmentInfoTest {
+ private final List<TaskId> activeTasks = Arrays.asList(
+ new TaskId(0, 0),
+ new TaskId(0, 0),
+ new TaskId(0, 1), new TaskId(1, 0));
+ private final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<TaskId, Set<TopicPartition>>() {
+ {
+ put(new TaskId(1, 1),
+ Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+ put(new TaskId(2, 0),
+ Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+ }
+ };
+ private final Map<HostInfo, Set<TopicPartition>> globalAssignment = new HashMap<HostInfo, Set<TopicPartition>>() {
+ {
+ put(new HostInfo("localhost", 80),
+ Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t3", 3)));
+ }
+ };
@Test
- public void testEncodeDecode() {
- List<TaskId> activeTasks =
- Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
- standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
- standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+ public void shouldUseLatestSupportedVersionByDefault() {
+ final AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, globalAssignment);
+ assertEquals(AssignmentInfo.LATEST_SUPPORTED_VERSION, info.version());
+ }
- AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
- AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowForUnknownVersion1() {
+ new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment);
+ }
- assertEquals(info, decoded);
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowForUnknownVersion2() {
+ new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment);
}
@Test
- public void shouldDecodePreviousVersion() throws IOException {
- List<TaskId> activeTasks =
- Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
- standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
- standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
- final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
- final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
- assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
- assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
- assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1
- assertEquals(1, decoded.version());
+ public void shouldEncodeAndDecodeVersion1() {
+ final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment);
+ final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+ assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
- /**
- * This is a clone of what the V1 encoding did. The encode method has changed for V2
- * so it is impossible to test compatibility without having this
- */
- private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
- // Encode version
- out.writeInt(oldVersion.version());
- // Encode active tasks
- out.writeInt(oldVersion.activeTasks().size());
- for (TaskId id : oldVersion.activeTasks()) {
- id.writeTo(out);
- }
- // Encode standby tasks
- out.writeInt(oldVersion.standbyTasks().size());
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) {
- TaskId id = entry.getKey();
- id.writeTo(out);
-
- Set<TopicPartition> partitions = entry.getValue();
- out.writeInt(partitions.size());
- for (TopicPartition partition : partitions) {
- out.writeUTF(partition.topic());
- out.writeInt(partition.partition());
- }
- }
-
- out.flush();
- out.close();
-
- return ByteBuffer.wrap(baos.toByteArray());
+ @Test
+ public void shouldEncodeAndDecodeVersion2() {
+ final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment);
+ final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment);
+ assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
+ }
+ @Test
+ public void shouldEncodeAndDecodeVersion3() {
+ final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment);
+ final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment);
+ assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 633285a..e98b8ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -19,81 +19,60 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Test;
-import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
public class SubscriptionInfoTest {
+ private final UUID processId = UUID.randomUUID();
+ private final Set<TaskId> activeTasks = new HashSet<>(Arrays.asList(
+ new TaskId(0, 0),
+ new TaskId(0, 1),
+ new TaskId(1, 0)));
+ private final Set<TaskId> standbyTasks = new HashSet<>(Arrays.asList(
+ new TaskId(1, 1),
+ new TaskId(2, 0)));
- @Test
- public void testEncodeDecode() {
- UUID processId = UUID.randomUUID();
+ private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80";
- Set<TaskId> activeTasks =
- new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
- Set<TaskId> standbyTasks =
- new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+ @Test
+ public void shouldUseLatestSupportedVersionByDefault() {
+ final SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, "localhost:80");
+ assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION, info.version());
+ }
- SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null);
- SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowForUnknownVersion1() {
+ new SubscriptionInfo(0, processId, activeTasks, standbyTasks, "localhost:80");
+ }
- assertEquals(info, decoded);
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowForUnknownVersion2() {
+ new SubscriptionInfo(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, processId, activeTasks, standbyTasks, "localhost:80");
}
@Test
- public void shouldEncodeDecodeWithUserEndPoint() {
- SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
- Collections.singleton(new TaskId(0, 0)), Collections.<TaskId>emptySet(), "localhost:80");
- SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
- assertEquals(original, decoded);
+ public void shouldEncodeAndDecodeVersion1() {
+ final SubscriptionInfo info = new SubscriptionInfo(1, processId, activeTasks, standbyTasks, IGNORED_USER_ENDPOINT);
+ final SubscriptionInfo expectedInfo = new SubscriptionInfo(1, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, null);
+ assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
@Test
- public void shouldBeBackwardCompatible() {
- UUID processId = UUID.randomUUID();
-
- Set<TaskId> activeTasks =
- new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
- Set<TaskId> standbyTasks =
- new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
-
- final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
- final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
- assertEquals(activeTasks, decode.prevTasks());
- assertEquals(standbyTasks, decode.standbyTasks());
- assertEquals(processId, decode.processId());
- assertNull(decode.userEndPoint());
+ public void shouldEncodeAndDecodeVersion2() {
+ final SubscriptionInfo info = new SubscriptionInfo(2, processId, activeTasks, standbyTasks, "localhost:80");
+ final SubscriptionInfo expectedInfo = new SubscriptionInfo(2, SubscriptionInfo.UNKNOWN, processId, activeTasks, standbyTasks, "localhost:80");
+ assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
- /**
- * This is a clone of what the V1 encoding did. The encode method has changed for V2
- * so it is impossible to test compatibility without having this
- */
- private ByteBuffer encodePreviousVersion(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
- ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
- // version
- buf.putInt(1);
- // encode client UUID
- buf.putLong(processId.getMostSignificantBits());
- buf.putLong(processId.getLeastSignificantBits());
- // encode ids of previously running tasks
- buf.putInt(prevTasks.size());
- for (TaskId id : prevTasks) {
- id.writeTo(buf);
- }
- // encode ids of cached tasks
- buf.putInt(standbyTasks.size());
- for (TaskId id : standbyTasks) {
- id.writeTo(buf);
- }
- buf.rewind();
-
- return buf;
+ @Test
+ public void shouldEncodeAndDecodeVersion3() {
+ final SubscriptionInfo info = new SubscriptionInfo(3, processId, activeTasks, standbyTasks, "localhost:80");
+ final SubscriptionInfo expectedInfo = new SubscriptionInfo(3, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80");
+ assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
}
+
}
diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..a8796cb
--- /dev/null
+++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ /**
+ * This test cannot be executed, as long as Kafka 1.1.1 is not released
+ */
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] : ""));
+ }
+ final String kafka = args[0];
+ final String propFileName = args.length > 1 ? args[1] : null;
+
+ final Properties streamsProperties = Utils.loadProps(propFileName);
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest v1.1)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("props=" + streamsProperties);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ config.putAll(streamsProperties);
+
+ final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 796ca31..a4b902a 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -412,6 +412,7 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
"org.apache.kafka.streams.tests.StreamsUpgradeTest",
"")
self.UPGRADE_FROM = None
+ self.UPGRADE_TO = None
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index fa79d57..8b7d771 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,8 +23,16 @@ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATES
import random
import time
+# broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)]
-simple_upgrade_versions_metadata_version_2 = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(DEV_VERSION)]
+
+metadata_1_versions = [str(LATEST_0_10_0)]
+metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+# we can add the following versions to `backward_compatible_metadata_2_versions` after the corresponding
+# bug-fix release 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, and 1.1.1 are available:
+# str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)
+backward_compatible_metadata_2_versions = []
+metadata_3_versions = [str(DEV_VERSION)]
class StreamsUpgradeTest(Test):
"""
@@ -39,6 +47,7 @@ class StreamsUpgradeTest(Test):
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
}
+ self.leader = None
def perform_broker_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling broker upgrade")
@@ -114,7 +123,7 @@ class StreamsUpgradeTest(Test):
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
- @matrix(from_version=simple_upgrade_versions_metadata_version_2, to_version=simple_upgrade_versions_metadata_version_2)
+ @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions)
def test_simple_upgrade_downgrade(self, from_version, to_version):
"""
Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_version>
@@ -165,15 +174,12 @@ class StreamsUpgradeTest(Test):
self.driver.stop()
- #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
- #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released
- #@parametrize(new_version=str(LATEST_0_11_0)) we cannot run this test until Kafka 0.11.0.3 is released
- #@parametrize(new_version=str(LATEST_1_0)) we cannot run this test until Kafka 1.0.2 is released
- #@parametrize(new_version=str(LATEST_1_1)) we cannot run this test until Kafka 1.1.1 is released
- @parametrize(new_version=str(DEV_VERSION))
- def test_metadata_upgrade(self, new_version):
+ #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
+ @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions)
+ @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions)
+ def test_metadata_upgrade(self, from_version, to_version):
"""
- Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
+ Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
"""
self.zk = ZookeeperService(self.test_context, num_nodes=1)
@@ -189,7 +195,7 @@ class StreamsUpgradeTest(Test):
self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
self.driver.start()
- self.start_all_nodes_with(str(LATEST_0_10_0))
+ self.start_all_nodes_with(from_version)
self.processors = [self.processor1, self.processor2, self.processor3]
@@ -200,13 +206,13 @@ class StreamsUpgradeTest(Test):
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
- self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+ self.do_rolling_bounce(p, from_version[:-2], to_version, counter)
counter = counter + 1
# second rolling bounce
random.shuffle(self.processors)
for p in self.processors:
- self.do_rolling_bounce(p, None, new_version, counter)
+ self.do_rolling_bounce(p, None, to_version, counter)
counter = counter + 1
# shutdown
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 66e5fcf..7823efa 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -63,17 +63,17 @@ def get_version(node=None):
DEV_BRANCH = KafkaVersion("dev")
DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT")
-# 0.8.2.X versions
+# 0.8.2.x versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
V_0_8_2_2 = KafkaVersion("0.8.2.2")
LATEST_0_8_2 = V_0_8_2_2
-# 0.9.0.X versions
+# 0.9.0.x versions
V_0_9_0_0 = KafkaVersion("0.9.0.0")
V_0_9_0_1 = KafkaVersion("0.9.0.1")
LATEST_0_9 = V_0_9_0_1
-# 0.10.0.X versions
+# 0.10.0.x versions
V_0_10_0_0 = KafkaVersion("0.10.0.0")
V_0_10_0_1 = KafkaVersion("0.10.0.1")
LATEST_0_10_0 = V_0_10_0_1
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.