You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/27 04:43:42 UTC
[kafka] branch 0.10.1 updated: KAFKA-6054: Fix upgrade path from
Kafka Streams v0.10.0 (#4746)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.10.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.10.1 by this push:
new faac933 KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4746)
faac933 is described below
commit faac933aeee5d976008a17248650943923728408
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Mar 26 21:43:38 2018 -0700
KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 (#4746)
Introduces new config parameter `upgrade.from`.
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
bin/kafka-run-class.sh | 38 +++-
build.gradle | 13 ++
.../consumer/internals/AbstractCoordinator.java | 2 +-
.../apache/kafka/common/protocol/types/Struct.java | 2 +-
.../authenticator/SaslClientCallbackHandler.java | 11 +-
.../kafka/clients/producer/ProducerRecordTest.java | 12 +-
docs/upgrade.html | 17 ++
gradle/dependencies.gradle | 2 +
gradle/rat.gradle | 14 +-
jenkins.sh | 20 +++
settings.gradle | 4 +-
.../org/apache/kafka/streams/StreamsConfig.java | 26 ++-
.../kafka/streams/processor/TopologyBuilder.java | 2 +-
.../internals/StreamPartitionAssignor.java | 24 ++-
.../internals/assignment/AssignmentInfo.java | 7 +-
.../internals/assignment/SubscriptionInfo.java | 5 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 28 +--
.../apache/kafka/streams/StreamsConfigTest.java | 42 +++--
.../streams/integration/FanoutIntegrationTest.java | 5 +-
.../KStreamAggregationDedupIntegrationTest.java | 8 +-
.../KStreamAggregationIntegrationTest.java | 29 ++-
.../streams/integration/ResetIntegrationTest.java | 2 +-
.../internals/StreamPartitionAssignorTest.java | 97 ++++++++--
.../processor/internals/StreamThreadTest.java | 12 +-
.../internals/assignment/AssignmentInfoTest.java | 3 +-
.../kafka/streams/smoketest/SmokeTestClient.java | 10 +-
.../kafka/streams/smoketest/SmokeTestDriver.java | 33 ++--
.../kafka/streams/smoketest/SmokeTestUtil.java | 23 +--
.../kafka/streams/smoketest/StreamsSmokeTest.java | 14 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 78 ++++++++
.../kafka/test/ProcessorTopologyTestDriver.java | 2 +-
.../kafka/streams/tests/StreamsUpgradeTest.java | 104 +++++++++++
tests/kafkatest/services/streams.py | 171 +++++++++++++++++-
.../tests/streams/streams_upgrade_test.py | 200 +++++++++++++++++++++
tests/kafkatest/version.py | 9 +-
vagrant/base.sh | 2 +
36 files changed, 904 insertions(+), 167 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 1f5140b..77123ff 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,48 @@ do
fi
done
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
- if should_include_file "$file"; then
- CLASSPATH="$CLASSPATH":"$file"
- fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ clients_lib_dir=$(dirname $0)/../clients/build/libs
+ streams_lib_dir=$(dirname $0)/../streams/build/libs
+ rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+ clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+ streams_lib_dir=$clients_lib_dir
+ rocksdb_lib_dir=$streams_lib_dir
+fi
+
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+ for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+else
+ for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar;
+ do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+ done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
do
CLASSPATH="$CLASSPATH":"$file"
done
diff --git a/build.gradle b/build.gradle
index d221d96..2a54021 100644
--- a/build.gradle
+++ b/build.gradle
@@ -776,6 +776,19 @@ project(':streams:examples') {
}
}
+project(':streams:upgrade-system-tests-0100') {
+ archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+ dependencies {
+ testCompile libs.kafkaStreams_0100
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
+
project(':log4j-appender') {
archivesBaseName = "kafka-log4j-appender"
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 93b92bb..dbbb912 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -918,7 +918,7 @@ public abstract class AbstractCoordinator implements Closeable {
log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
this.failed.set(new RuntimeException(e));
} catch (RuntimeException e) {
- log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+ log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
this.failed.set(e);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701..7488748 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -316,7 +316,7 @@ public class Struct {
Field f = this.schema.get(i);
if (f.type() instanceof ArrayOf) {
if (this.get(f) != null) {
- Object[] arrayObject = (Object []) this.get(f);
+ Object[] arrayObject = (Object[]) this.get(f);
for (Object arrayItem: arrayObject)
result = prime * result + arrayItem.hashCode();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 8e0b8db..b80dfcc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@ import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
/**
* Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -59,7 +58,7 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
nc.setName(nc.getDefaultName());
} else if (callback instanceof PasswordCallback) {
if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
- char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+ char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
((PasswordCallback) callback).setPassword(password);
} else {
String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index a844bb0..5186d05 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -26,24 +26,24 @@ public class ProducerRecordTest {
@Test
public void testEqualsAndHashCode() {
- ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1);
+ ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1, "key", 1);
assertEquals(producerRecord, producerRecord);
assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
- ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1);
+ ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1, "key", 1);
assertEquals(producerRecord, equalRecord);
assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
- ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1);
+ ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1);
assertFalse(producerRecord.equals(topicMisMatch));
- ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1);
+ ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2, "key", 1);
assertFalse(producerRecord.equals(partitionMismatch));
- ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1);
+ ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1);
assertFalse(producerRecord.equals(keyMisMatch));
- ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2);
+ ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2);
assertFalse(producerRecord.equals(valueMisMatch));
ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6b9747..faa96c1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,23 @@ only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older
<h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5>
<ul>
+ <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+ (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+ As an alternative, an offline upgrade is also possible.
+ <ul>
+ <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+ <li> bounce each instance of your application once </li>
+ <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+ <li> bounce each instance of your application once more to complete the upgrade </li>
+ </ul>
+ </li>
+ <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+ <ul>
+ <li> stop all old (0.10.0.x) application instances </li>
+ <li> update your code and swap old code and jar file with new code and new jar file </li>
+ <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+ </ul>
+ </li>
<li> Stream grouping and aggregation split into two methods:
<ul>
<li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2ff459f..07944a9 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -31,6 +31,7 @@ versions += [
jackson: "2.6.3",
jetty: "9.2.22.v20170606",
jersey: "2.22.2",
+ kafka_0100: "0.10.0.1",
log4j: "1.2.17",
jopt: "4.9",
junit: "4.12",
@@ -91,6 +92,7 @@ libs += [
junit: "junit:junit:$versions.junit",
log4j: "log4j:log4j:$versions.log4j",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+ kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
lz4: "net.jpountz.lz4:lz4:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index d62b372..a51876c 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -84,9 +84,15 @@ class RatTask extends DefaultTask {
if (!reportDir.exists()) {
reportDir.mkdirs()
}
- generateXmlReport(reportDir)
- printUnknownFiles()
- generateHtmlReport()
+ def origEncoding = System.getProperty("file.encoding")
+ try {
+ System.setProperty("file.encoding", "UTF-8") //affects the output of the ant rat task
+ generateXmlReport(reportDir)
+ printUnknownFiles()
+ generateHtmlReport()
+ } finally {
+ System.setProperty("file.encoding", origEncoding)
+ }
}
}
@@ -109,7 +115,7 @@ class RatPlugin implements Plugin<Project> {
mavenCentral()
}
project.dependencies {
- rat 'org.apache.rat:apache-rat-tasks:0.11'
+ rat 'org.apache.rat:apache-rat-tasks:0.12'
}
}
}
diff --git a/jenkins.sh b/jenkins.sh
new file mode 100755
index 0000000..c21eb1d
--- /dev/null
+++ b/jenkins.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
+# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
+# the modules are executed before the integration tests.
+./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest test rat --no-daemon -PxmlFindBugsReport=true -PtestLoggingEvents=started,passed,skipped,failed "$@"
diff --git a/settings.gradle b/settings.gradle
index d430c2f..f3a1b81 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
- 'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
+ 'log4j-appender', 'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5ba4383..e33efef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
/**
* Configuration for Kafka Streams. Documentation for these configurations can be found in the <a
@@ -54,6 +55,16 @@ public class StreamsConfig extends AbstractConfig {
// Prefix used to isolate producer configs from consumer configs.
public static final String PRODUCER_PREFIX = "producer.";
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+ */
+ public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+ /** {@code upgrade.from} */
+ public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+ public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+ "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
/** <code>state.dir</code> */
public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -257,14 +268,19 @@ public class StreamsConfig extends AbstractConfig {
10 * 1024 * 1024L,
atLeast(0),
Importance.LOW,
- CACHE_MAX_BYTES_BUFFERING_DOC);
+ CACHE_MAX_BYTES_BUFFERING_DOC)
+ .define(UPGRADE_FROM_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ in(null, UPGRADE_FROM_0100),
+ ConfigDef.Importance.LOW,
+ UPGRADE_FROM_DOC);
}
// this is the list of configs for underlying clients
// that streams prefer different default values
private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
- static
- {
+ static {
Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
@@ -272,8 +288,7 @@ public class StreamsConfig extends AbstractConfig {
}
private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
- static
- {
+ static {
Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -342,6 +357,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
+ consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63..bfca83a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -676,7 +676,7 @@ public class TopologyBuilder {
}
}
- private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+ private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
final Set<String> sourceTopics = new HashSet<>();
for (String parent : parents) {
NodeFactory nodeFactory = nodeFactories.get(parent);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba543..e6c407f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -66,7 +65,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public final TaskId taskId;
public final TopicPartition partition;
- public AssignedPartition(TaskId taskId, TopicPartition partition) {
+ AssignedPartition(final TaskId taskId, final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
}
@@ -92,6 +91,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private StreamThread streamThread;
+ private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
private int numStandbyReplicas;
private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
@@ -111,6 +111,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+ if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+ log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+ userMetadataVersion = 1;
+ }
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
@@ -174,7 +179,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Set<TaskId> prevTasks = streamThread.prevTasks();
Set<TaskId> standbyTasks = streamThread.cachedTasks();
standbyTasks.removeAll(prevTasks);
- SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+ SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
if (streamThread.builder.sourceTopicPattern() != null) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -265,12 +270,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Map<UUID, ClientState<TaskId>> states = new HashMap<>();
Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
// decode subscription info
+ int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
String consumerId = entry.getKey();
Subscription subscription = entry.getValue();
-
SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+ final int usedVersion = info.version;
+ if (usedVersion < minUserMetadataVersion) {
+ minUserMetadataVersion = usedVersion;
+ }
if (info.userEndPoint != null) {
final String[] hostPort = info.userEndPoint.split(":");
consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
@@ -460,6 +469,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
assignmentSuppliers.add(new AssignmentSupplier(consumer,
+ minUserMetadataVersion,
active,
standby,
endPointMap,
@@ -483,17 +493,20 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
class AssignmentSupplier {
private final String consumer;
+ private final int metadataVersion;
private final List<TaskId> active;
private final Map<TaskId, Set<TopicPartition>> standby;
private final Map<HostInfo, Set<TopicPartition>> endPointMap;
private final List<TopicPartition> activePartitions;
AssignmentSupplier(final String consumer,
+ final int metadataVersion,
final List<TaskId> active,
final Map<TaskId, Set<TopicPartition>> standby,
final Map<HostInfo, Set<TopicPartition>> endPointMap,
final List<TopicPartition> activePartitions) {
this.consumer = consumer;
+ this.metadataVersion = metadataVersion;
this.active = active;
this.standby = standby;
this.endPointMap = endPointMap;
@@ -501,7 +514,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
Assignment get() {
- return new Assignment(activePartitions, new AssignmentInfo(active,
+ return new Assignment(activePartitions, new AssignmentInfo(metadataVersion,
+ active,
standby,
endPointMap).encode());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f85..ce9aa63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.record.ByteBufferInputStream;
@@ -56,7 +55,7 @@ public class AssignmentInfo {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+ public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
this.activeTasks = activeTasks;
@@ -155,9 +154,7 @@ public class AssignmentInfo {
}
}
- return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
-
+ return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index c3481c0..92c50a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -32,7 +31,7 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
- private static final int CURRENT_VERSION = 2;
+ public static final int CURRENT_VERSION = 2;
public final int version;
public final UUID processId;
@@ -44,7 +43,7 @@ public class SubscriptionInfo {
this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
}
- private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+ public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
this.version = version;
this.processId = processId;
this.prevTasks = prevTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 35b88db..e4ba9cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -39,11 +40,12 @@ public class KafkaStreamsTest {
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@Test
- public void testStartAndClose() throws Exception {
+ public void testStartAndClose() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -62,11 +64,12 @@ public class KafkaStreamsTest {
}
@Test
- public void testCloseIsIdempotent() throws Exception {
+ public void testCloseIsIdempotent() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -79,10 +82,11 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void testCannotStartOnceClosed() throws Exception {
+ public void testCannotStartOnceClosed() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -99,10 +103,11 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void testCannotStartTwice() throws Exception {
+ public void testCannotStartTwice() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -119,25 +124,25 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+ public void shouldNotGetAllTasksWhenNotRunning() {
final KafkaStreams streams = createKafkaStreams();
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+ public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
final KafkaStreams streams = createKafkaStreams();
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+ public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
final KafkaStreams streams = createKafkaStreams();
streams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+ public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
final KafkaStreams streams = createKafkaStreams();
streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
@Override
@@ -152,16 +157,18 @@ public class KafkaStreamsTest {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
return new KafkaStreams(builder, props);
}
@Test
- public void testCleanup() throws Exception {
+ public void testCleanup() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -173,10 +180,11 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void testCannotCleanupWhileRunning() throws Exception {
+ public void testCannotCleanupWhileRunning() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f03bed9..9d40148 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -58,7 +58,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetProducerConfigs() throws Exception {
+ public void testGetProducerConfigs() {
Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
@@ -66,7 +66,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetConsumerConfigs() throws Exception {
+ public void testGetConsumerConfigs() {
Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
@@ -75,7 +75,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetRestoreConsumerConfigs() throws Exception {
+ public void testGetRestoreConsumerConfigs() {
Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -84,7 +84,7 @@ public class StreamsConfigTest {
@Test
public void defaultSerdeShouldBeConfigured() {
- Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+ Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", "UTF8");
serializerConfigs.put("value.serializer.encoding", "UTF-16");
Serializer<String> serializer = Serdes.String().serializer();
@@ -115,7 +115,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -125,7 +125,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedRestoreConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -135,7 +135,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -143,7 +143,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -151,7 +151,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(producerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -160,7 +160,7 @@ public class StreamsConfigTest {
@Test
- public void shouldSupportPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportPrefixedProducerConfigs() {
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -170,7 +170,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -180,7 +180,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -190,7 +190,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportNonPrefixedProducerConfigs() {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -199,24 +199,22 @@ public class StreamsConfigTest {
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
-
-
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.keySerde();
}
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.valueSerde();
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConfigs() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -226,7 +224,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultProducerConfigs() {
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
@@ -234,7 +232,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -244,14 +242,14 @@ public class StreamsConfigTest {
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+ public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getConsumerConfigs(null, "a", "b");
}
@Test(expected = ConfigException.class)
- public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+ public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.getRestoreConsumerConfigs("client");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb076..88098dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -79,13 +80,12 @@ public class FanoutIntegrationTest {
private static final String OUTPUT_TOPIC_C = "C";
@BeforeClass
- public static void startKafkaCluster() throws Exception {
+ public static void startKafkaCluster() {
CLUSTER.createTopic(INPUT_TOPIC_A);
CLUSTER.createTopic(OUTPUT_TOPIC_B);
CLUSTER.createTopic(OUTPUT_TOPIC_C);
}
-
@Parameter
public long cacheSizeBytes;
@@ -117,6 +117,7 @@ public class FanoutIntegrationTest {
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+ streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
final KStream<byte[], String> stream2 = stream1.mapValues(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index ab08dbe..eeb455b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -122,8 +122,8 @@ public class KStreamAggregationDedupIntegrationTest {
List<KeyValue<String, String>> results = receiveMessages(
new StringDeserializer(),
- new StringDeserializer()
- , 5);
+ new StringDeserializer(),
+ 5);
Collections.sort(results, new Comparator<KeyValue<String, String>>() {
@Override
@@ -172,8 +172,8 @@ public class KStreamAggregationDedupIntegrationTest {
List<KeyValue<String, String>> windowedOutput = receiveMessages(
new StringDeserializer(),
- new StringDeserializer()
- , 10);
+ new StringDeserializer(),
+ 10);
Comparator<KeyValue<String, String>>
comparator =
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e5560c1..383a793 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -97,11 +97,10 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testNo;
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
@@ -155,8 +154,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, String>> results = receiveMessages(
new StringDeserializer(),
- new StringDeserializer()
- , 10);
+ new StringDeserializer(),
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, String>>() {
@Override
@@ -209,8 +208,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, String>> windowedOutput = receiveMessages(
new StringDeserializer(),
- new StringDeserializer()
- , 15);
+ new StringDeserializer(),
+ 15);
final Comparator<KeyValue<String, String>>
comparator =
@@ -263,8 +262,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Integer>> results = receiveMessages(
new StringDeserializer(),
- new IntegerDeserializer()
- , 10);
+ new IntegerDeserializer(),
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
@Override
@@ -313,8 +312,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
new StringDeserializer(),
- new IntegerDeserializer()
- , 15);
+ new IntegerDeserializer(),
+ 15);
final Comparator<KeyValue<String, Integer>>
comparator =
@@ -364,8 +363,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
- new LongDeserializer()
- , 10);
+ new LongDeserializer(),
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -406,8 +405,8 @@ public class KStreamAggregationIntegrationTest {
final List<KeyValue<String, Long>> results = receiveMessages(
new StringDeserializer(),
- new LongDeserializer()
- , 10);
+ new LongDeserializer(),
+ 10);
Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
@Override
public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5f85536..7848d1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -260,7 +260,7 @@ public class ResetIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
- streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016..8a1e13a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -99,13 +100,14 @@ public class StreamPartitionAssignorTest {
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+ setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
}
};
}
@SuppressWarnings("unchecked")
@Test
- public void testSubscription() throws Exception {
+ public void testSubscription() {
StreamsConfig config = new StreamsConfig(configProps());
TopologyBuilder builder = new TopologyBuilder();
@@ -148,7 +150,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignBasic() throws Exception {
+ public void testAssignBasic() {
StreamsConfig config = new StreamsConfig(configProps());
TopologyBuilder builder = new TopologyBuilder();
@@ -215,7 +217,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithNewTasks() throws Exception {
+ public void testAssignWithNewTasks() {
StreamsConfig config = new StreamsConfig(configProps());
TopologyBuilder builder = new TopologyBuilder();
@@ -274,7 +276,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStates() throws Exception {
+ public void testAssignWithStates() {
StreamsConfig config = new StreamsConfig(configProps());
String applicationId = "test";
TopologyBuilder builder = new TopologyBuilder();
@@ -334,7 +336,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithStandbyReplicas() throws Exception {
+ public void testAssignWithStandbyReplicas() {
Properties props = configProps();
props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
StreamsConfig config = new StreamsConfig(props);
@@ -449,7 +451,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testOnAssignment() throws Exception {
+ public void testOnAssignment() {
StreamsConfig config = new StreamsConfig(configProps());
TopicPartition t2p3 = new TopicPartition("topic2", 3);
@@ -482,7 +484,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopics() throws Exception {
+ public void testAssignWithInternalTopics() {
StreamsConfig config = new StreamsConfig(configProps());
String applicationId = "test";
TopologyBuilder builder = new TopologyBuilder();
@@ -521,7 +523,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
StreamsConfig config = new StreamsConfig(configProps());
String applicationId = "test";
TopologyBuilder builder = new TopologyBuilder();
@@ -555,7 +557,7 @@ public class StreamPartitionAssignorTest {
subscriptions.put("consumer10",
new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
- Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+ partitionAssignor.assign(metadata, subscriptions);
// check prepared internal topics
assertEquals(2, internalTopicManager.readyTopics.size());
@@ -563,7 +565,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+ public void shouldAddUserDefinedEndPointToSubscription() {
final Properties properties = configProps();
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
final StreamsConfig config = new StreamsConfig(properties);
@@ -589,7 +591,70 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.emptySet();
+ subscriptions.put(
+ "consumer1",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+ subscriptions.put(
+ "consumer2",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ StreamsConfig config = new StreamsConfig(configProps());
+
+ final TopologyBuilder builder = new TopologyBuilder();
+ final StreamThread streamThread = new StreamThread(
+ builder,
+ config,
+ new MockClientSupplier(),
+ "applicationId",
+ "clientId",
+ UUID.randomUUID(),
+ new Metrics(),
+ new SystemTime(),
+ new StreamsMetadataState(builder));
+
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId"));
+ final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+ assertEquals(2, assignment.size());
+ assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+ assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+ }
+
+ @Test
+ public void shouldDownGradeSubscription() {
+ final Properties properties = configProps();
+ properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
+ final StreamsConfig config = new StreamsConfig(properties);
+
+ final TopologyBuilder builder = new TopologyBuilder();
+ builder.addSource("source1", "topic1");
+
+ final String clientId = "client-id";
+ final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId,
+ UUID.randomUUID(), new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+ final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+ assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
+ }
+
+ @Test
+ public void shouldMapUserEndPointToTopicPartitions() {
final Properties properties = configProps();
final String myEndPoint = "localhost:8080";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -628,7 +693,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
final Properties properties = configProps();
final String myEndPoint = "localhost";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -655,7 +720,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
final Properties properties = configProps();
final String myEndPoint = "localhost:j87yhk";
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -682,7 +747,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+ public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
final Map<HostInfo, Set<TopicPartition>> hostState =
@@ -696,7 +761,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldSetClusterMetadataOnAssignment() throws Exception {
+ public void shouldSetClusterMetadataOnAssignment() {
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
@@ -718,7 +783,7 @@ public class StreamPartitionAssignorTest {
}
@Test
- public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+ public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
final Cluster cluster = partitionAssignor.clusterMetadata();
assertNotNull(cluster);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f252e9..2ea5738 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,11 +17,6 @@
package org.apache.kafka.streams.processor.internals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -40,6 +35,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.io.File;
@@ -56,6 +52,11 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
public class StreamThreadTest {
private final String clientId = "clientId";
@@ -117,6 +118,7 @@ public class StreamThreadTest {
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+ setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
}
};
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23..6c94c18 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -65,10 +65,9 @@ public class AssignmentInfoTest {
assertEquals(oldVersion.activeTasks, decoded.activeTasks);
assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
- assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+ assertEquals(1, decoded.version);
}
-
/**
* This is a clone of what the V1 encoding did. The encode method has changed for V2
* so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index f920c51..63ad01d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -105,7 +105,7 @@ public class SmokeTestClient extends SmokeTestUtil {
}
});
- data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+ data.process(SmokeTestUtil.<String, Integer>printProcessorSupplier("data"));
// min
KGroupedStream<String, Integer>
@@ -131,7 +131,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, intSerde, "min");
KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
- minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+ minTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("min"));
// max
groupedData.aggregate(
@@ -153,7 +153,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, intSerde, "max");
KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
- maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+ maxTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("max"));
// sum
groupedData.aggregate(
@@ -176,7 +176,7 @@ public class SmokeTestClient extends SmokeTestUtil {
KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
- sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+ sumTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("sum"));
// cnt
groupedData.count(UnlimitedWindows.of(), "uwin-cnt")
@@ -185,7 +185,7 @@ public class SmokeTestClient extends SmokeTestUtil {
).to(stringSerde, longSerde, "cnt");
KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
- cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+ cntTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("cnt"));
// dif
maxTable.join(minTable,
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index f9d30d5..f103355 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -125,38 +125,48 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
- Properties props = new Properties();
+ return generate(kafka, numKeys, maxRecordsPerKey, true);
+ }
+
+ public static Map<String, Set<Integer>> generate(final String kafka,
+ final int numKeys,
+ final int maxRecordsPerKey,
+ final boolean autoTerminate) throws Exception {
+ final Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
- KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+ final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
int numRecordsProduced = 0;
- Map<String, Set<Integer>> allData = new HashMap<>();
- ValueList[] data = new ValueList[numKeys];
+ final Map<String, Set<Integer>> allData = new HashMap<>();
+ final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<Integer>());
}
- Random rand = new Random();
+ final Random rand = new Random();
- int remaining = data.length;
+ int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+ if (autoTerminate) {
+ remaining = data.length;
+ }
while (remaining > 0) {
- int index = rand.nextInt(remaining);
- String key = data[index].key;
+ final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+ final String key = data[index].key;
int value = data[index].next();
- if (value < 0) {
+ if (autoTerminate && value < 0) {
remaining--;
data[index] = data[remaining];
value = END;
}
- ProducerRecord<byte[], byte[]> record =
+ final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
producer.send(record);
@@ -165,8 +175,9 @@ public class SmokeTestDriver extends SmokeTestUtil {
numRecordsProduced++;
allData.get(key).add(value);
- if (numRecordsProduced % 100 == 0)
+ if (numRecordsProduced % 100 == 0) {
System.out.println(numRecordsProduced + " records produced");
+ }
Thread.sleep(10);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 7ff738f..d9ad745 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -37,27 +37,20 @@ public class SmokeTestUtil {
public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
public final static int END = Integer.MAX_VALUE;
- public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
- return printProcessorSupplier(topic, false);
- }
-
- public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
- return new ProcessorSupplier<String, T>() {
- public Processor<String, T> get() {
- return new AbstractProcessor<String, T>() {
+ public static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
- private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
- this.context = context;
}
@Override
- public void process(String key, T value) {
- if (printOffset) System.out.println(">>> " + context.offset());
+ public void process(K key, V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -65,12 +58,10 @@ public class SmokeTestUtil {
}
@Override
- public void punctuate(long timestamp) {
- }
+ public void punctuate(long timestamp) {}
@Override
- public void close() {
- }
+ public void close() {}
};
}
};
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index c26544e..3328ae5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -24,7 +24,7 @@ import java.util.Set;
public class StreamsSmokeTest {
/**
- * args ::= command kafka zookeeper stateDir
+ * args ::= command kafka zookeeper stateDir disableAutoTerminate
* command := "run" | "process"
*
* @param args
@@ -34,12 +34,14 @@ public class StreamsSmokeTest {
String kafka = args.length > 1 ? args[1] : null;
String zookeeper = args.length > 2 ? args[2] : null;
String stateDir = args.length > 3 ? args[3] : null;
+ boolean disableAutoTerminate = args.length > 4;
- System.out.println("StreamsSmokeTest instance started");
+ System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("stateDir=" + stateDir);
+ System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "standalone":
@@ -49,8 +51,12 @@ public class StreamsSmokeTest {
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
- Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
- SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ if (disableAutoTerminate) {
+ SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+ } else {
+ Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+ SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ }
break;
case "process":
// this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..e771bea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.smoketest.SmokeTestUtil;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ final String kafka = args[0];
+ final String zookeeper = args[1];
+ final String stateDir = args[2];
+ final String upgradeFrom = args.length > 3 ? args[3] : null;
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+ System.out.println("upgradeFrom=" + upgradeFrom);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ if (upgradeFrom != null) {
+ config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+ }
+
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 83a9092..1bedd87 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -345,7 +345,7 @@ public class ProcessorTopologyTestDriver {
// consumer.subscribe(new TopicPartition(topicName, 1));
// Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
List<PartitionInfo> partitionInfos = new ArrayList<>();
- partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null));
+ partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
consumer.updatePartitions(topicName, partitionInfos);
consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
}
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 0000000..7d3ed43
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) {
+ if (args.length < 3) {
+ System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+ + (args.length > 0 ? args[0] + " " : "")
+ + (args.length > 1 ? args[1] : ""));
+ }
+ final String kafka = args[0];
+ final String zookeeper = args[1];
+ final String stateDir = args[2];
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+ System.out.println("kafka=" + kafka);
+ System.out.println("zookeeper=" + zookeeper);
+ System.out.println("stateDir=" + stateDir);
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream dataStream = builder.stream("data");
+ dataStream.process(printProcessorSupplier());
+ dataStream.to("echo");
+
+ final Properties config = new Properties();
+ config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+ config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+ final KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ System.out.println("closing Kafka Streams instance");
+ System.out.flush();
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }
+ });
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ return new ProcessorSupplier<K, V>() {
+ public Processor<K, V> get() {
+ return new AbstractProcessor<K, V>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final K key, final V value) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ }
+ }
+
+ @Override
+ public void punctuate(final long timestamp) {}
+
+ @Override
+ public void close() {}
+ };
+ }
+ };
+ }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a63810e..b857bd5 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -33,6 +33,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
+ CLEAN_NODE_ENABLED = True
+
logs = {
"streams_log": {
"path": LOG_FILE,
@@ -43,6 +45,114 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
+ "streams_log.0-1": {
+ "path": LOG_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stdout.0-1": {
+ "path": STDOUT_FILE + ".0-1",
+ "collect_default": True},
+ "streams_stderr.0-1": {
+ "path": STDERR_FILE + ".0-1",
+ "collect_default": True},
+ "streams_log.0-2": {
+ "path": LOG_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stdout.0-2": {
+ "path": STDOUT_FILE + ".0-2",
+ "collect_default": True},
+ "streams_stderr.0-2": {
+ "path": STDERR_FILE + ".0-2",
+ "collect_default": True},
+ "streams_log.0-3": {
+ "path": LOG_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stdout.0-3": {
+ "path": STDOUT_FILE + ".0-3",
+ "collect_default": True},
+ "streams_stderr.0-3": {
+ "path": STDERR_FILE + ".0-3",
+ "collect_default": True},
+ "streams_log.0-4": {
+ "path": LOG_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stdout.0-4": {
+ "path": STDOUT_FILE + ".0-4",
+ "collect_default": True},
+ "streams_stderr.0-4": {
+ "path": STDERR_FILE + ".0-4",
+ "collect_default": True},
+ "streams_log.0-5": {
+ "path": LOG_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stdout.0-5": {
+ "path": STDOUT_FILE + ".0-5",
+ "collect_default": True},
+ "streams_stderr.0-5": {
+ "path": STDERR_FILE + ".0-5",
+ "collect_default": True},
+ "streams_log.0-6": {
+ "path": LOG_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stdout.0-6": {
+ "path": STDOUT_FILE + ".0-6",
+ "collect_default": True},
+ "streams_stderr.0-6": {
+ "path": STDERR_FILE + ".0-6",
+ "collect_default": True},
+ "streams_log.1-1": {
+ "path": LOG_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stdout.1-1": {
+ "path": STDOUT_FILE + ".1-1",
+ "collect_default": True},
+ "streams_stderr.1-1": {
+ "path": STDERR_FILE + ".1-1",
+ "collect_default": True},
+ "streams_log.1-2": {
+ "path": LOG_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stdout.1-2": {
+ "path": STDOUT_FILE + ".1-2",
+ "collect_default": True},
+ "streams_stderr.1-2": {
+ "path": STDERR_FILE + ".1-2",
+ "collect_default": True},
+ "streams_log.1-3": {
+ "path": LOG_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stdout.1-3": {
+ "path": STDOUT_FILE + ".1-3",
+ "collect_default": True},
+ "streams_stderr.1-3": {
+ "path": STDERR_FILE + ".1-3",
+ "collect_default": True},
+ "streams_log.1-4": {
+ "path": LOG_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stdout.1-4": {
+ "path": STDOUT_FILE + ".1-4",
+ "collect_default": True},
+ "streams_stderr.1-4": {
+ "path": STDERR_FILE + ".1-4",
+ "collect_default": True},
+ "streams_log.1-5": {
+ "path": LOG_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stdout.1-5": {
+ "path": STDOUT_FILE + ".1-5",
+ "collect_default": True},
+ "streams_stderr.1-5": {
+ "path": STDERR_FILE + ".1-5",
+ "collect_default": True},
+ "streams_log.1-6": {
+ "path": LOG_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stdout.1-6": {
+ "path": STDOUT_FILE + ".1-6",
+ "collect_default": True},
+ "streams_stderr.1-6": {
+ "path": STDERR_FILE + ".1-6",
+ "collect_default": True},
}
def __init__(self, context, kafka, command):
@@ -95,7 +205,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
- node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+ if self.CLEAN_NODE_ENABLED:
+ node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
@@ -120,10 +231,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
- self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+ self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
- monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+ monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@@ -132,8 +243,62 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+ self.DISABLE_AUTO_TERMINATE = ""
+ def disable_auto_terminate(self):
+ self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['zk'] = self.kafka.zk.connect_setting()
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
+ " %(command)s %(kafka)s %(zk)s %(state_dir)s %(disable_auto_terminate)s " \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
+
+class StreamsUpgradeTestJobRunnerService(StreamsSmokeTestBaseService):
+ def __init__(self, context, kafka):
+ super(StreamsUpgradeTestJobRunnerService, self).__init__(context, kafka, "")
+ self.UPGRADE_FROM = ""
+
+ def set_version(self, kafka_streams_version):
+ self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+ def set_upgrade_from(self, upgrade_from):
+ self.UPGRADE_FROM = upgrade_from
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['kafka'] = self.kafka.bootstrap_servers()
+ args['zk'] = self.kafka.zk.connect_setting()
+ args['state_dir'] = self.PERSISTENT_ROOT
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['version'] = self.KAFKA_STREAMS_VERSION
+ args['upgrade_from'] = self.UPGRADE_FROM
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ " INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+ " %(kafka_run_class)s org.apache.kafka.streams.tests.StreamsUpgradeTest " \
+ " %(kafka)s %(zk)s %(state_dir)s %(upgrade_from)s " \
+ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+ return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 0000000..8266e07
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, TRUNK_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+ """
+ Test upgrading Kafka Streams from 0.10.0.x to 0.10.1.x (ie, TRUNK)
+ """
+
+ def __init__(self, test_context):
+ super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+ 'echo' : { 'partitions': 5 },
+ 'data' : { 'partitions': 5 }
+ })
+
+ self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+ self.driver.disable_auto_terminate()
+ self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+ self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+ self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+
+ def test_upgrade(self):
+ """
+ Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to 0.10.1
+ """
+
+ self.driver.start()
+ self.start_all_nodes_with_0100()
+
+ self.processors = [self.processor1, self.processor2, self.processor3]
+
+ counter = 1
+ random.seed()
+
+ # first rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ p.CLEAN_NODE_ENABLED = False
+ self.do_rolling_bounce(p, "0.10.0", counter)
+ counter = counter + 1
+
+ # second rolling bounce
+ random.shuffle(self.processors)
+ for p in self.processors:
+ self.do_rolling_bounce(p, "", counter)
+ counter = counter + 1
+
+ # shutdown
+ self.driver.stop()
+ self.driver.wait()
+
+ random.shuffle(self.processors)
+ for p in self.processors:
+ node = p.node
+ with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+ p.stop()
+ monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+ timeout_sec=60,
+ err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+ self.driver.stop()
+
+ def start_all_nodes_with_0100(self):
+ # start first with 0.10.0
+ self.prepare_for_0100(self.processor1)
+ node1 = self.processor1.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+ with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+ self.processor1.start()
+ log_monitor.wait_until("Kafka version : 0.10.0.1",
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node1.account))
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+ # start second with 0.10.0
+ self.prepare_for_0100(self.processor2)
+ node2 = self.processor2.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+ with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+ self.processor2.start()
+ log_monitor.wait_until("Kafka version : 0.10.0.1",
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node2.account))
+ first_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+ # start third with 0.10.0
+ self.prepare_for_0100(self.processor3)
+ node3 = self.processor3.node
+ with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+ with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+ with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+ with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+ self.processor3.start()
+ log_monitor.wait_until("Kafka version : 0.10.0.1",
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node3.account))
+ first_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+ second_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+ third_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+ @staticmethod
+ def prepare_for_0100(processor):
+ processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+ processor.set_version(str(LATEST_0_10_0))
+
+ def do_rolling_bounce(self, processor, upgrade_from, counter):
+ first_other_processor = None
+ second_other_processor = None
+ for p in self.processors:
+ if p != processor:
+ if first_other_processor is None:
+ first_other_processor = p
+ else:
+ second_other_processor = p
+
+ node = processor.node
+ first_other_node = first_other_processor.node
+ second_other_node = second_other_processor.node
+
+ # stop processor and wait for rebalance of others
+ with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+ with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+ processor.stop()
+ first_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+ second_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+ if upgrade_from == "": # upgrade disabled -- second round of rolling bounces
+ roll_counter = ".1-" # second round of rolling bounces
+ else:
+ roll_counter = ".0-" # first round of rolling boundes
+
+ node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+ node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
+
+ processor.set_version("") # set to TRUNK
+ processor.set_upgrade_from(upgrade_from)
+
+ grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+ with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+ with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+ with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+ processor.start()
+
+ log_monitor.wait_until("Kafka version : " + str(TRUNK_VERSION),
+ timeout_sec=60,
+ err_msg="Could not detect Kafka Streams version " + str(TRUNK_VERSION) + " " + str(node.account))
+ first_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+ found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+ second_other_monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+ found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+ if len(found) > 0:
+ raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+ monitor.wait_until("processed 100 records from topic",
+ timeout_sec=60,
+ err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
\ No newline at end of file
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 239a9f4..ebf5ecf 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -64,6 +64,7 @@ def get_version(node=None):
return TRUNK
TRUNK = KafkaVersion("trunk")
+TRUNK_VERSION = KafkaVersion("0.10.1.2-SNAPSHOT")
# 0.8.2.X versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -78,7 +79,11 @@ LATEST_0_9 = V_0_9_0_1
# 0.10.0.X versions
V_0_10_0_0 = KafkaVersion("0.10.0.0")
V_0_10_0_1 = KafkaVersion("0.10.0.1")
-# Adding 0.10.0 as the next version will be 0.10.1.x
LATEST_0_10_0 = V_0_10_0_1
-LATEST_0_10 = LATEST_0_10_0
\ No newline at end of file
+# 0.10.1.X versions
+V_0_10_1_0 = KafkaVersion("0.10.1.0")
+V_0_10_1_1 = KafkaVersion("0.10.1.1")
+LATEST_0_10_1 = V_0_10_1_1
+
+LATEST_0_10 = LATEST_0_10_1
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 88878dc..b8cde3a 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -52,6 +52,8 @@ get_kafka() {
kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+ # the .tgz above does not include the streams test jar hence we need to get it separately
+ url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
if [ ! -d /opt/kafka-$version ]; then
pushd /tmp
curl -O $url
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.