You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/27 03:32:26 UTC
[kafka] branch 0.11.0 updated: KAFKA-6054: Fix upgrade path from
Kafka Streams v0.10.0 (#4761)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 13dbcad KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4761)
13dbcad is described below
commit 13dbcad9bb2096195d45d30b59513f4db6a92b1a
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Mar 26 20:32:20 2018 -0700
KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4761)
Introduces new config parameter `upgrade.from`.
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
bin/kafka-run-class.sh | 40 +++-
build.gradle | 36 +++
.../authenticator/SaslClientCallbackHandler.java | 9 +-
docs/streams/upgrade-guide.html | 27 ++-
docs/upgrade.html | 54 ++++-
gradle/dependencies.gradle | 6 +
settings.gradle | 3 +-
.../org/apache/kafka/streams/StreamsConfig.java | 17 ++
.../internals/StreamPartitionAssignor.java | 17 +-
.../internals/assignment/AssignmentInfo.java | 6 +-
.../internals/assignment/SubscriptionInfo.java | 4 +-
.../apache/kafka/streams/StreamsConfigTest.java | 47 ++--
.../KStreamAggregationDedupIntegrationTest.java | 4 +-
.../internals/StreamPartitionAssignorTest.java | 154 +++++++++----
.../internals/assignment/AssignmentInfoTest.java | 3 +-
.../kafka/streams/tests/SmokeTestDriver.java | 38 ++--
.../apache/kafka/streams/tests/SmokeTestUtil.java | 9 +-
.../kafka/streams/tests/StreamsSmokeTest.java | 14 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 73 ++++++
.../kafka/streams/tests/StreamsUpgradeTest.java | 104 +++++++++
.../kafka/streams/tests/StreamsUpgradeTest.java | 114 ++++++++++
.../kafka/streams/tests/StreamsUpgradeTest.java | 108 +++++++++
tests/kafkatest/services/streams.py | 173 ++++++++++++++-
.../tests/streams/streams_upgrade_test.py | 246 +++++++++++++++++++++
tests/kafkatest/version.py | 5 +-
vagrant/base.sh | 2 +
26 files changed, 1183 insertions(+), 130 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index fe6aefd..8e2ba91 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,50 @@ do
fi
done
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
- if should_include_file "$file"; then
- CLASSPATH="$CLASSPATH":"$file"
- fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ clients_lib_dir=$(dirname $0)/../clients/build/libs
+ streams_lib_dir=$(dirname $0)/../streams/build/libs
+ rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+ clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+ streams_lib_dir=$clients_lib_dir
+ rocksdb_lib_dir=$streams_lib_dir
+fi
+
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+else
+ VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'`
+ SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number
+ for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
diff --git a/build.gradle b/build.gradle
index ce4b4e4..17f3e00 100644
--- a/build.gradle
+++ b/build.gradle
@@ -909,6 +909,42 @@ project(':streams:examples') {
}
}
+project(':streams:upgrade-system-tests-0100') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0100
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
+project(':streams:upgrade-system-tests-0101') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0101"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0101
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
+project(':streams:upgrade-system-tests-0102') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0102"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0102
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 7111bad..7102414 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@@ -25,10 +27,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 7f2c9f6..86d6d53 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -27,16 +27,33 @@
</p>
<p>
- If you want to upgrade from 0.10.1.x to 0.10.2, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
+ If you want to upgrade from 0.10.1.x to 0.11.0, see the <a href="/{{version}}/documentation/#upgrade_1020_streams"><b>Upgrade Section for 0.10.2</b></a>.
It highlights incompatible changes you need to consider to upgrade your code and application.
- See <a href="#streams_api_changes_0102">below</a> a complete list of 0.10.2 API and semantical changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+ See below a complete list of <a href="#streams_api_changes_0102">0.10.2</a> and <a href="#streams_api_changes_0110">0.11.0</a> API and semantical changes
+ that allow you to advance your application and/or simplify your code base, including the usage of new features.
</p>
<p>
- If you want to upgrade from 0.10.0.x to 0.10.1, see the <a href="/{{version}}/documentation/#upgrade_1010_streams"><b>Upgrade Section for 0.10.1</b></a>.
- It highlights incompatible changes you need to consider to upgrade your code and application.
- See <a href="#streams_api_changes_0101">below</a> a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features.
+ Upgrading from 0.10.0.x to 0.11.0.x directly is also possible.
+ Note, that a brokers must be on version 0.10.1 or higher to run a Kafka Streams application version 0.10.1 or higher.
+ See <a href="#streams_api_changes_0101">Streams API changes in 0.10.1</a>, <a href="#streams_api_changes_0102">Streams API changes in 0.10.2</a>,
+ and <a href="#streams_api_changes_0110">Streams API changes in 0.11.0</a> for a complete list of API changes.
+ Upgrading to 0.11.0.3 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, an offline upgrade is also possible.
</p>
+ <ul>
+ <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.11.0.3 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.11.0.3 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to complete the upgrade </li>
+ </ul>
+ <p> Upgrading from 0.10.0.x to 0.11.0.0, 0.11.0.1, or 0.11.0.2 requires an offline upgrade (rolling bounce upgrade is not supported) </p>
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code and new jar file </li>
+ <li> restart all new (0.11.0.0, 0.11.0.1, or 0.11.0.2) application instances </li>
+ </ul>
<h3><a id="streams_api_changes_0110" href="#streams_api_changes_0110">Streams API changes in 0.11.0.0</a></h3>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 9f0dbdf..0603875 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -64,6 +64,12 @@
before you switch to 0.11.0.</li>
</ol>
+<h5><a id="upgrade_1103_notable" href="#upgrade_1103_notable">Notable changes in 0.11.0.3</a></h5>
+<ul>
+ <li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+ <li> See the <a href="/{{version}}/documentation/streams/upgrade-guide.html"><b>Kafka Streams upgrade guide</b></a> for details about this new config.
+</ul>
+
<h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes in 0.11.0.0</a></h5>
<ul>
<li>Unclean leader election is now disabled by default. The new default favors durability over availability. Users who wish to
@@ -214,14 +220,41 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support 0.8.
<li> See <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details. </li>
</ul>
+<h5><a id="upgrade_1020_streams_from_0100" href="#upgrade_1020_streams_from_0100">Upgrading a 0.10.0 Kafka Streams Application</a></h5>
+<ul>
+ <li> Upgrading your Streams application from 0.10.0 to 0.10.2 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.2 application can only connect to 0.10.2 or 0.10.1 brokers. </li>
+ <li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0102">Streams API changes in 0.10.2</a> for more details).
+ Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+ <li> Upgrading from 0.10.0.x to 0.10.2.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, an offline upgrade is also possible.
+ <ul>
+ <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.2.2 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.10.2.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to complete the upgrade </li>
+ </ul>
+ </li>
+ <li> Upgrading from 0.10.0.x to 0.10.2.0 or 0.10.2.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code and new jar file </li>
+ <li> restart all new (0.10.2.0 or 0.10.2.1) application instances </li>
+ </ul>
+ </li>
+</ul>
+
+<h5><a id="upgrade_10202_notable" href="#upgrade_10202_notable">Notable changes in 0.10.2.2</a></h5>
+<ul>
+ <li> New configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from version 0.10.0.x </li>
+</ul>
+
<h5><a id="upgrade_10201_notable" href="#upgrade_10201_notable">Notable changes in 0.10.2.1</a></h5>
<ul>
<li> The default values for two configurations of the StreamsConfig class were changed to improve the resiliency of Kafka Streams applications. The internal Kafka Streams producer <code>retries</code> default value was changed from 0 to 10. The internal Kafka Streams consumer <code>max.poll.interval.ms</code> default value was changed from 300000 to <code>Integer.MAX_VALUE</code>.
</li>
</ul>
-
-
<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5>
<ul>
<li>The Java clients (producer and consumer) have acquired the ability to communicate with older brokers. Version 0.10.2 clients
@@ -294,6 +327,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
<li> Upgrading your Streams application from 0.10.0 to 0.10.1 does require a <a href="#upgrade_10_1">broker upgrade</a> because a Kafka Streams 0.10.1 application can only connect to 0.10.1 brokers. </li>
<li> There are couple of API changes, that are not backward compatible (cf. <a href="/{{version}}/documentation/streams#streams_api_changes_0101">Streams API changes in 0.10.1</a> for more details).
Thus, you need to update and recompile your code. Just swapping the Kafka Streams library jar file will not work and will break your application. </li>
+ <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, an offline upgrade is also possible.
+ <ul>
+ <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to complete the upgrade </li>
+ </ul>
+ </li>
+ <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code and new jar file </li>
+ <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+ </ul>
+ </li>
</ul>
<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 5d145e1..d881353 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -55,6 +55,9 @@ versions += [
jackson: "2.8.5",
jetty: "9.2.22.v20170606",
jersey: "2.24",
+ kafka_0100: "0.10.0.1",
+ kafka_0101: "0.10.1.1",
+ kafka_0102: "0.10.2.1",
log4j: "1.2.17",
jopt: "5.0.3",
junit: "4.12",
@@ -96,6 +99,9 @@ libs += [
junit: "junit:junit:$versions.junit",
log4j: "log4j:log4j:$versions.log4j",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+ kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
+ kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
+ kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
lz4: "net.jpountz.lz4:lz4:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/settings.gradle b/settings.gradle
index f0fdf07..769046f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
+ 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'log4j-appender',
'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b411344..d45b135 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -106,6 +106,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String PRODUCER_PREFIX = "producer.";
/**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+ */
+ public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+ /**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
public static final String AT_LEAST_ONCE = "at_least_once";
@@ -247,6 +252,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "</code> instead";
+ /** {@code upgrade.from} */
+ public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+ public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+ "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
/**
* {@code value.serde}
* @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
@@ -466,6 +476,12 @@ public class StreamsConfig extends AbstractConfig {
null,
Importance.LOW,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(UPGRADE_FROM_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ in(null, UPGRADE_FROM_0100),
+ Importance.LOW,
+ UPGRADE_FROM_DOC)
.define(VALUE_SERDE_CLASS_CONFIG,
Type.CLASS,
null,
@@ -632,6 +648,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
+ consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 0a1b2ab..6e2bfa6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -165,6 +165,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private String userEndPoint;
private int numStandbyReplicas;
+ private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
+
private Cluster metadataWithInternalTopics;
private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
@@ -192,6 +194,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+ log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+ userMetadataVersion = 1;
+ }
+
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -251,7 +259,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Set<TaskId> previousActiveTasks = streamThread.prevActiveTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(previousActiveTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint);
+ SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint);
if (streamThread.builder.sourceTopicPattern() != null &&
!streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
@@ -295,11 +303,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// construct the client metadata from the decoded subscription info
Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+ int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+ final int usedVersion = info.version;
+ if (usedVersion < minUserMetadataVersion) {
+ minUserMetadataVersion = usedVersion;
+ }
// create the new client metadata if necessary
ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
@@ -556,7 +569,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
// finally, encode the assignment before sending back to coordinator
- assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
+ assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
i++;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 77fb58a..5409976 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -55,7 +55,7 @@ public class AssignmentInfo {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+ public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
this.activeTasks = activeTasks;
@@ -154,9 +154,7 @@ public class AssignmentInfo {
}
}
- return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
-
+ return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dba..00227e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,7 +31,7 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
- private static final int CURRENT_VERSION = 2;
+ public static final int CURRENT_VERSION = 2;
public final int version;
public final UUID processId;
@@ -43,7 +43,7 @@ public class SubscriptionInfo {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
- private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+ public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
this.processId = processId;
this.prevTasks = prevTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3bbd69e..9998283 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -82,7 +82,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetProducerConfigs() throws Exception {
+ public void testGetProducerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
@@ -91,7 +91,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetConsumerConfigs() throws Exception {
+ public void testGetConsumerConfigs() {
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
@@ -102,7 +102,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetRestoreConsumerConfigs() throws Exception {
+ public void testGetRestoreConsumerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
@@ -143,7 +143,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -153,7 +153,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedRestoreConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -163,7 +163,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -171,7 +171,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -179,7 +179,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(producerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -188,7 +188,7 @@ public class StreamsConfigTest {
@Test
- public void shouldSupportPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportPrefixedProducerConfigs() {
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -198,7 +198,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -208,7 +208,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -218,7 +218,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportNonPrefixedProducerConfigs() {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -227,24 +227,22 @@ public class StreamsConfigTest {
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
-
-
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.defaultKeySerde();
}
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.defaultValueSerde();
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConfigs() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -254,7 +252,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultProducerConfigs() {
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -262,7 +260,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -272,21 +270,21 @@ public class StreamsConfigTest {
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+ public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "a", "b");
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getRestoreConsumerConfigs("client");
}
@Test
- public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
+ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
@@ -395,6 +393,7 @@ public class StreamsConfigTest {
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@@ -429,6 +428,7 @@ public class StreamsConfigTest {
assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@@ -442,6 +442,7 @@ public class StreamsConfigTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectKeySerdeClassOnError() {
final Properties props = minimalStreamsConfig();
@@ -455,6 +456,7 @@ public class StreamsConfigTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
final Properties props = minimalStreamsConfig();
@@ -468,6 +470,7 @@ public class StreamsConfigTest {
}
}
+ @SuppressWarnings("deprecation")
@Test
public void shouldSpecifyCorrectValueSerdeClassOnError() {
final Properties props = minimalStreamsConfig();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 372b89c..bb4b575 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -51,7 +52,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import kafka.utils.MockTime;
import org.junit.experimental.categories.Category;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -315,6 +315,4 @@ public class KStreamAggregationDedupIntegrationTest {
}
-
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 98cd20a..a29380f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -135,7 +135,7 @@ public class StreamPartitionAssignorTest {
@SuppressWarnings("unchecked")
@Test
- public void testSubscription() throws Exception {
+ public void testSubscription() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -187,7 +187,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignBasic() throws Exception {
+ public void testAssignBasic() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -239,12 +239,10 @@ public class StreamPartitionAssignorTest {
assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions()));
// check assignment info
-
- Set<TaskId> allActiveTasks = new HashSet<>();
+ AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
// the first consumer
- AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@@ -264,7 +262,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithPartialTopology() throws Exception {
+ public void testAssignWithPartialTopology() {
Properties props = configProps();
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
StreamsConfig config = new StreamsConfig(props);
@@ -306,9 +304,8 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
// check assignment info
- Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -316,7 +313,7 @@ public class StreamPartitionAssignorTest {
@Test
- public void testAssignEmptyMetadata() throws Exception {
+ public void testAssignEmptyMetadata() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
@@ -359,9 +356,8 @@ public class StreamPartitionAssignorTest {
new HashSet<>(assignments.get("consumer10").partitions()));
// check assignment info
- Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
assertEquals(0, allActiveTasks.size());
assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@@ -384,7 +380,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithNewTasks() throws Exception {
+ public void testAssignWithNewTasks() {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
builder.addSource("source3", "topic3");
@@ -430,13 +426,9 @@ public class StreamPartitionAssignorTest {
// check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
// also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
// then later ones will be re-assigned to other hosts due to load balancing
- Set<TaskId> allActiveTasks = new HashSet<>();
- Set<TopicPartition> allPartitions = new HashSet<>();
- AssignmentInfo info;
-
- info = AssignmentInfo.decode(assignments.get("consumer10").userData());
- allActiveTasks.addAll(info.activeTasks);
- allPartitions.addAll(assignments.get("consumer10").partitions());
+ AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData());
+ Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks);
+ Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
allActiveTasks.addAll(info.activeTasks);
@@ -451,7 +443,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStates() throws Exception {
+ public void testAssignWithStates() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addSource("source1", "topic1");
@@ -551,7 +543,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStandbyReplicas() throws Exception {
+ public void testAssignWithStandbyReplicas() {
Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
StreamsConfig config = new StreamsConfig(props);
@@ -600,13 +592,10 @@ public class StreamPartitionAssignorTest {
Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
- Set<TaskId> allActiveTasks = new HashSet<>();
- Set<TaskId> allStandbyTasks = new HashSet<>();
-
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
- allStandbyTasks.addAll(info10.standbyTasks.keySet());
+ Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks);
+ Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks.keySet());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
@@ -634,7 +623,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testOnAssignment() throws Exception {
+ public void testOnAssignment() {
TopicPartition t2p3 = new TopicPartition("topic2", 3);
TopologyBuilder builder = new TopologyBuilder();
@@ -677,7 +666,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopics() throws Exception {
+ public void testAssignWithInternalTopics() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@@ -722,7 +711,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
String applicationId = "test";
builder.setApplicationId(applicationId);
builder.addInternalTopic("topicX");
@@ -760,7 +749,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+ public void shouldAddUserDefinedEndPointToSubscription() {
final Properties properties = configProps();
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
final StreamsConfig config = new StreamsConfig(properties);
@@ -773,8 +762,8 @@ public class StreamPartitionAssignorTest {
final UUID uuid1 = UUID.randomUUID();
final String client1 = "client1";
- final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
- 0, stateDirectory);
+ final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1,
+ uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0, stateDirectory);
partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
@@ -783,7 +772,82 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.emptySet();
+ subscriptions.put(
+ "consumer1",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+ subscriptions.put(
+ "consumer2",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ final TopologyBuilder builder = new TopologyBuilder();
+ final StreamThread streamThread = new StreamThread(
+ builder,
+ config,
+ mockClientSupplier,
+ "appId",
+ "clientId",
+ UUID.randomUUID(),
+ new Metrics(),
+ Time.SYSTEM,
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+ 0,
+ stateDirectory);
+
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId"));
+ final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+ assertEquals(2, assignment.size());
+ assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+ assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+ }
+
+ @Test
+ public void shouldDownGradeSubscription() {
+ final Properties properties = configProps();
+ properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
+ StreamsConfig config = new StreamsConfig(properties);
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+
+ String clientId = "client-id";
+ final StreamThread streamThread = new StreamThread(
+ builder,
+ config,
+ mockClientSupplier,
+ "appId",
+ "clientId",
+ UUID.randomUUID(),
+ new Metrics(),
+ Time.SYSTEM,
+ new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
+ 0,
+ stateDirectory);
+
+ StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", clientId));
+
+ PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+ assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
+ }
+
+ @Test
+ public void shouldMapUserEndPointToTopicPartitions() {
final Properties properties = configProps();
final String myEndPoint = "localhost:8080";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -831,7 +895,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
final Properties properties = configProps();
final String myEndPoint = "localhost";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -865,7 +929,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
final Properties properties = configProps();
final String myEndPoint = "localhost:j87yhk";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -897,7 +961,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+ public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@@ -910,7 +974,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldSetClusterMetadataOnAssignment() throws Exception {
+ public void shouldSetClusterMetadataOnAssignment() {
final List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
Collections.singletonMap(new HostInfo("localhost", 80),
@@ -930,7 +994,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+ public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
final Cluster cluster = partitionAssignor.clusterMetadata();
assertNotNull(cluster);
}
@@ -1039,11 +1103,11 @@ public class StreamPartitionAssignorTest {
new TopicPartition(applicationId + "-count-repartition", 1),
new TopicPartition(applicationId + "-count-repartition", 2)
);
- assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment)));
+ assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment)));
}
@Test
- public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
+ public void shouldUpdatePartitionHostInfoMapOnAssignment() {
final TopicPartition partitionOne = new TopicPartition("topic", 1);
final TopicPartition partitionTwo = new TopicPartition("topic", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@@ -1060,7 +1124,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
+ public void shouldUpdateClusterMetadataOnAssignment() {
final TopicPartition topicOne = new TopicPartition("topic", 1);
final TopicPartition topicTwo = new TopicPartition("topic2", 2);
final Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(
@@ -1076,7 +1140,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
final Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
final StreamsConfig config = new StreamsConfig(props);
@@ -1135,12 +1199,12 @@ public class StreamPartitionAssignorTest {
}
@Test(expected = KafkaException.class)
- public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception {
+ public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
}
@Test(expected = KafkaException.class)
- public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception {
+ public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() {
final Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 9473a40..361dde8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -64,10 +64,9 @@ public class AssignmentInfoTest {
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
- assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+ assertEquals(1, decoded.version);
}
-
/**
* This is a clone of what the V1 encoding did. The encode method has changed for V2
* so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 11e1ae8..3030615 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -77,7 +77,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
// This main() is not used by the system test. It is intended to be used for local debugging.
public static void main(String[] args) throws Exception {
final String kafka = "localhost:9092";
- final String zookeeper = "localhost:2181";
final File stateDir = TestUtils.tempDirectory();
final int numKeys = 20;
@@ -131,42 +130,50 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
+ return generate(kafka, numKeys, maxRecordsPerKey, true);
+ }
+ public static Map<String, Set<Integer>> generate(final String kafka,
+ final int numKeys,
+ final int maxRecordsPerKey,
+ final boolean autoTerminate) throws Exception {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
- // the next 4 config values make sure that all records are produced with no loss and
- // no duplicates
+ // the next 2 config values make sure that all records are produced with no loss and no duplicates
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
+ final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
int numRecordsProduced = 0;
- Map<String, Set<Integer>> allData = new HashMap<>();
- ValueList[] data = new ValueList[numKeys];
+ final Map<String, Set<Integer>> allData = new HashMap<>();
+ final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<Integer>());
}
- Random rand = new Random();
+ final Random rand = new Random();
- int remaining = data.length;
+ int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+ if (autoTerminate) {
+ remaining = data.length;
+ }
while (remaining > 0) {
- int index = rand.nextInt(remaining);
- String key = data[index].key;
+ final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+ final String key = data[index].key;
int value = data[index].next();
- if (value < 0) {
+ if (autoTerminate && value < 0) {
remaining--;
data[index] = data[remaining];
} else {
- ProducerRecord<byte[], byte[]> record =
- new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
+ final ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
producer.send(record, new Callback() {
@Override
@@ -178,11 +185,12 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
});
-
numRecordsProduced++;
allData.get(key).add(value);
- if (numRecordsProduced % 100 == 0)
+
+ if (numRecordsProduced % 100 == 0) {
System.out.println(numRecordsProduced + " records produced");
+ }
Utils.sleep(2);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 150ec7d..11845b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -44,20 +44,15 @@ public class SmokeTestUtil {
public Processor<Object, Object> get() {
return new AbstractProcessor<Object, Object>() {
private int numRecordsProcessed = 0;
- private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
- this.context = context;
}
@Override
public void process(final Object key, final Object value) {
- if (printOffset) {
- System.out.println(">>> " + context.offset());
- }
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -65,10 +60,10 @@ public class SmokeTestUtil {
}
@Override
- public void punctuate(final long timestamp) { }
+ public void punctuate(final long timestamp) {}
@Override
- public void close() { }
+ public void close() {}
};
}
};
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 244aa8e..699aaeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -23,7 +23,7 @@ import java.util.Set;
public class StreamsSmokeTest {
/**
- * args ::= command kafka zookeeper stateDir
+ * args ::= command kafka zookeeper stateDir disableAutoTerminate
* command := "run" | "process"
*
* @param args
@@ -32,11 +32,13 @@ public class StreamsSmokeTest {
String kafka = args[0];
String stateDir = args.length > 1 ? args[1] : null;
String command = args.length > 2 ? args[2] : null;
+ boolean disableAutoTerminate = args.length > 3;
- System.out.println("StreamsTest instance started");
+ System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
+ System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "standalone":
@@ -46,8 +48,12 @@ public class StreamsSmokeTest {
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
- Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
- SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ if (disableAutoTerminate) {
+ SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+ } else {
+ Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+ SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ }
break;
case "process":
// this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..0ee47e4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 2) {
+ System.err.println("StreamsUpgradeTest requires two argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] : ""));
+ }
+ final String kafka = args[0];
+ final String stateDir = args[1];
+ final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+ }
+
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..72d7f5a
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ final String kafka = args[0];
+ final String zookeeper = args[1];
+ final String stateDir = args[2];
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..eebd0fa
--- /dev/null
+++ b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ /**
+ * This test cannot be run executed, as long as Kafka 0.10.1.2 is not released
+ */
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ final String kafka = args[0];
+ final String zookeeper = args[1];
+ final String stateDir = args[2];
+ final String upgradeFrom = args.length > 3 ? args[3] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.1)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ // TODO: because Kafka 0.10.1.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
+ //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+ config.setProperty("upgrade.from", upgradeFrom);
+ }
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..18240f0
--- /dev/null
+++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ /**
+ * This test cannot be run executed, as long as Kafka 0.10.2.2 is not released
+ */
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 2) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] : ""));
+ }
+ final String kafka = args[0];
+ final String stateDir = args[1];
+ final String upgradeFrom = args.length > 2 ? args[2] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.2)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ // TODO: because Kafka 0.10.2.2 is not released yet, thus `UPGRADE_FROM_CONFIG` is not available yet
+ //config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+ config.setProperty("upgrade.from", upgradeFrom);
+ }
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e6f692b..eeb1681 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -20,6 +20,7 @@ from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
class StreamsTestBaseService(KafkaPathResolverMixin, Service):
@@ -33,6 +34,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+ CLEAN_NODE_ENABLED = True
+
logs = {
"streams_log": {
"path": LOG_FILE,
@@ -43,6 +46,114 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
+ "streams_log.0-1": {
+ "path": LOG_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stdout.0-1": {
+ "path": STDOUT_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stderr.0-1": {
+ "path": STDERR_FILE + ".0-1",
+ "collect_default": True},
+ "streams_log.0-2": {
+ "path": LOG_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stdout.0-2": {
+ "path": STDOUT_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stderr.0-2": {
+ "path": STDERR_FILE + ".0-2",
+ "collect_default": True},
+ "streams_log.0-3": {
+ "path": LOG_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stdout.0-3": {
+ "path": STDOUT_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stderr.0-3": {
+ "path": STDERR_FILE + ".0-3",
+ "collect_default": True},
+ "streams_log.0-4": {
+ "path": LOG_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stdout.0-4": {
+ "path": STDOUT_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stderr.0-4": {
+ "path": STDERR_FILE + ".0-4",
+ "collect_default": True},
+ "streams_log.0-5": {
+ "path": LOG_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stdout.0-5": {
+ "path": STDOUT_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stderr.0-5": {
+ "path": STDERR_FILE + ".0-5",
+ "collect_default": True},
+ "streams_log.0-6": {
+ "path": LOG_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stdout.0-6": {
+ "path": STDOUT_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stderr.0-6": {
+ "path": STDERR_FILE + ".0-6",
+ "collect_default": True},
+ "streams_log.1-1": {
+ "path": LOG_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stdout.1-1": {
+ "path": STDOUT_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stderr.1-1": {
+ "path": STDERR_FILE + ".1-1",
+ "collect_default": True},
+ "streams_log.1-2": {
+ "path": LOG_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stdout.1-2": {
+ "path": STDOUT_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stderr.1-2": {
+ "path": STDERR_FILE + ".1-2",
+ "collect_default": True},
+ "streams_log.1-3": {
+ "path": LOG_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stdout.1-3": {
+ "path": STDOUT_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stderr.1-3": {
+ "path": STDERR_FILE + ".1-3",
+ "collect_default": True},
+ "streams_log.1-4": {
+ "path": LOG_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stdout.1-4": {
+ "path": STDOUT_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stderr.1-4": {
+ "path": STDERR_FILE + ".1-4",
+ "collect_default": True},
+ "streams_log.1-5": {
+ "path": LOG_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stdout.1-5": {
+ "path": STDOUT_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stderr.1-5": {
+ "path": STDERR_FILE + ".1-5",
+ "collect_default": True},
+ "streams_log.1-6": {
+ "path": LOG_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stdout.1-6": {
+ "path": STDOUT_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stderr.1-6": {
+ "path": STDERR_FILE + ".1-6",
+ "collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1=None, user_test_args2=None):
@@ -107,7 +218,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
- node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+ if self.CLEAN_NODE_ENABLED:
+ node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
@@ -163,7 +275,28 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
+ self.DISABLE_AUTO_TERMINATE = ""
+
+ def disable_auto_terminate(self):
+ self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
+ " %(kafka)s %(state_dir)s %(user_test_args)s %(disable_auto_terminate)s" \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
@@ -206,3 +339,41 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
eosEnabled)
+
+class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
+ kafka,
+ "org.apache.kafka.streams.tests.StreamsUpgradeTest",
+ "")
+ self.UPGRADE_FROM = ""
+
+ def set_version(self, kafka_streams_version):
+ self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+ def set_upgrade_from(self, upgrade_from):
+ self.UPGRADE_FROM = upgrade_from
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
+ args['zk'] = self.kafka.zk.connect_setting()
+ else:
+ args['zk'] = ""
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['version'] = self.KAFKA_STREAMS_VERSION
+ args['upgrade_from'] = self.UPGRADE_FROM
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+ " %(kafka_run_class)s %(streams_class_name)s " \
+ " %(kafka)s %(zk)s %(state_dir)s %(user_test_args)s %(upgrade_from)s" \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..7aa2de6
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+ """
+ Test upgrading Kafka Streams (all version combination)
+ If metadata was changes, upgrade is more difficult
+ Metadata version was bumped in 0.10.1.0
+ """
+
+ def __init__(self, test_context):
+ super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ 'echo' : { 'partitions': 5 },
+ 'data' : { 'partitions': 5 }
+ })
+
+ self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+ self.driver.disable_auto_terminate()
+ self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+ self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+ self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+
+ @parametrize(old_version=str(LATEST_0_10_1), new_version=str(LATEST_0_10_2))
+ @parametrize(old_version=str(LATEST_0_10_1), new_version=str(DEV_VERSION))
+ @parametrize(old_version=str(LATEST_0_10_2), new_version=str(DEV_VERSION))
+ def test_simple_upgrade(self, old_version, new_version):
+ """
+ Starts 3 KafkaStreams instances with <old_version>, and upgrades one-by-one to <new_verion>
+ """
+
+ self.driver.start()
+ self.start_all_nodes_with(old_version)
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ counter = 1
+ random.seed()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = False
+ self.do_rolling_bounce(p, "", new_version, counter)
+ counter = counter + 1
+
+ # shutdown
+ self.driver.stop()
+ self.driver.wait()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ node = p.node
+ with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+ p.stop()
+ monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+ timeout_sec=60,
+ err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+ self.driver.stop()
+
+ #@parametrize(new_version=str(LATEST_0_10_1)) we cannot run this test until Kafka 0.10.1.2 is released
+ #@parametrize(new_version=str(LATEST_0_10_2)) we cannot run this test until Kafka 0.10.2.2 is released
+ @parametrize(new_version=str(DEV_VERSION))
+ def test_metadata_upgrade(self, new_version):
+ """
+ Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to <new_version>
+ """
+
+ self.driver.start()
+ self.start_all_nodes_with(str(LATEST_0_10_0))
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ counter = 1
+ random.seed()
+
+ # first rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = False
+ self.do_rolling_bounce(p, "0.10.0", new_version, counter)
+ counter = counter + 1
+
+ # second rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ self.do_rolling_bounce(p, "", new_version, counter)
+ counter = counter + 1
+
+ # shutdown
+ self.driver.stop()
+ self.driver.wait()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ node = p.node
+ with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+ p.stop()
+ monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+ timeout_sec=60,
+ err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+ self.driver.stop()
+
+ def start_all_nodes_with(self, version):
+ # start first with <version>
+ self.prepare_for(self.processor1, version)
+ node1 = self.processor1.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+ with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+ self.processor1.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+ # start second with <version>
+ self.prepare_for(self.processor2, version)
+ node2 = self.processor2.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+ with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+ self.processor2.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version " + version + " " + str(node2.account))
+ first_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+ # start third with <version>
+ self.prepare_for(self.processor3, version)
+ node3 = self.processor3.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+ with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+ with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+ self.processor3.start()
+ log_monitor.wait_until("Kafka version : " + version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version " + version + " " + str(node3.account))
+ first_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+ third_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+ @staticmethod
+ def prepare_for(processor, version):
+ processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+ processor.set_version(version)
+
+ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+ first_other_processor = None
+ second_other_processor = None
+ for p in self.processors:
+ if p != processor:
+ if first_other_processor is None:
+ first_other_processor = p
+ else:
+ second_other_processor = p
+
+ node = processor.node
+ first_other_node = first_other_processor.node
+ second_other_node = second_other_processor.node
+
+ # stop processor and wait for rebalance of others
+ with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+ with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+ processor.stop()
+ first_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+ second_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+ if upgrade_from == "": # upgrade disabled -- second round of rolling bounces
+ roll_counter = ".1-" # second round of rolling bounces
+ else:
+ roll_counter = ".0-" # first round of rolling boundes
+
+ node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
+
+ if new_version == str(DEV_VERSION):
+ processor.set_version("") # set to TRUNK
+ else:
+ processor.set_version(new_version)
+ processor.set_upgrade_from(upgrade_from)
+
+ grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+ with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+ with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+ with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+ processor.start()
+
+ log_monitor.wait_until("Kafka version : " + new_version,
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version " + new_version + " " + str(node.account))
+ first_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+ found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+ second_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index f63a7c1..94ba100 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,6 +61,7 @@ def get_version(node=None):
return DEV_BRANCH
DEV_BRANCH = KafkaVersion("dev")
+DEV_VERSION = KafkaVersion("0.11.0.3-SNAPSHOT")
# 0.8.2.X versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -91,5 +92,7 @@ LATEST_0_10 = LATEST_0_10_2
# 0.11.0.0 versions
V_0_11_0_0 = KafkaVersion("0.11.0.0")
-LATEST_0_11_0 = V_0_11_0_0
+V_0_11_0_1 = KafkaVersion("0.11.0.1")
+V_0_11_0_2 = KafkaVersion("0.11.0.2")
+LATEST_0_11_0 = V_0_11_0_2
LATEST_0_11 = LATEST_0_11_0
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 4c0add5..28b81ed 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -64,6 +64,8 @@ get_kafka() {
kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
+ # the .tgz above does not include the streams test jar hence we need to get it separately
+ url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
if [ ! -d /opt/kafka-$version ]; then
pushd /tmp
curl -O $url
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.