You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/22 05:16:22 UTC
kafka git commit: KAFKA-4923: Modify compatibility test for
Exaclty-Once Semantics in Streams
Repository: kafka
Updated Branches:
refs/heads/trunk dc10b0ea0 -> 495836a4a
KAFKA-4923: Modify compatibility test for Exaclty-Once Semantics in Streams
- add broker compatibility system tests
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy, Eno Thereska, Guozhang Wang
Closes #2974 from mjsax/kafka-4923-add-eos-to-streams-add-broker-check-and-system-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/495836a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/495836a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/495836a4
Branch: refs/heads/trunk
Commit: 495836a4a27c2219f72a5f3776ba8e5216493b41
Parents: dc10b0e
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun May 21 22:16:18 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun May 21 22:16:18 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle.xml | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 4 +-
.../processor/internals/StreamsKafkaClient.java | 19 ++++++-
.../streams/tests/BrokerCompatibilityTest.java | 21 +++++--
tests/kafkatest/services/streams.py | 4 +-
.../streams_broker_compatibility_test.py | 59 +++++++++++++-------
tests/kafkatest/version.py | 7 ++-
vagrant/base.sh | 2 +
8 files changed, 86 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index cf57a50..743c68d 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -109,7 +109,7 @@
</module>
<module name="BooleanExpressionComplexity">
<!-- default is 3 -->
- <property name="max" value="4"/>
+ <property name="max" value="5"/>
</module>
<module name="ClassFanOutComplexity">
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 74f5fc1..3b801a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -73,6 +73,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -409,7 +411,7 @@ public class KafkaStreams {
private void checkBrokerVersionCompatibility() throws StreamsException {
final StreamsKafkaClient client = new StreamsKafkaClient(config);
- client.checkBrokerCompatibility();
+ client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
try {
client.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index c2b76f9..44f7900 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -61,6 +61,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
+
public class StreamsKafkaClient {
private static final ConfigDef CONFIG = StreamsConfig.configDef()
@@ -310,7 +313,7 @@ public class StreamsKafkaClient {
*
* @throws StreamsException if brokers have version 0.10.0.x
*/
- public void checkBrokerCompatibility() throws StreamsException {
+ public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
new ApiVersionsRequest.Builder(),
@@ -331,5 +334,19 @@ public class StreamsKafkaClient {
if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
}
+
+ if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) {
+ throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher.");
+ }
}
+
+ private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) {
+ return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null
+ && apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c04b3d1..0af2594 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -33,6 +34,7 @@ import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.util.Collections;
+import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -41,11 +43,12 @@ public class BrokerCompatibilityTest {
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
System.out.println("StreamsTest instance started");
final String kafka = args.length > 0 ? args[0] : "localhost:9092";
final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+ final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
@@ -58,6 +61,9 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ if (eosEnabled) {
+ streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
final int timeout = 6000;
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
@@ -70,7 +76,7 @@ public class BrokerCompatibilityTest {
final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
- public void uncaughtException(Thread t, Throwable e) {
+ public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
streams.close(30, TimeUnit.SECONDS);
@@ -91,27 +97,30 @@ public class BrokerCompatibilityTest {
System.out.println("wait for result");
- loopUntilRecordReceived(kafka);
+ loopUntilRecordReceived(kafka, eosEnabled);
System.out.println("close Kafka Streams");
streams.close();
}
- private static void loopUntilRecordReceived(final String kafka) {
+ private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ if (eosEnabled) {
+ consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Collections.singletonList(SINK_TOPIC));
while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
+ final ConsumerRecords<String, String> records = consumer.poll(100);
+ for (final ConsumerRecord<String, String> record : records) {
if (record.key().equals("key") && record.value().equals("value")) {
consumer.close();
return;
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e7be947..905320a 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -166,8 +166,8 @@ class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
class StreamsBrokerCompatibilityService(StreamsTestBaseService):
- def __init__(self, test_context, kafka):
+ def __init__(self, test_context, kafka, eosEnabled):
super(StreamsBrokerCompatibilityService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
- "dummy")
+ eosEnabled)
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index a5bdbc6..c4b554e 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -21,14 +21,16 @@ from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
class StreamsBrokerCompatibility(Test):
"""
- These tests validate that Streams v0.10.2+ can connect to older brokers v0.10.1+
- and that Streams fails fast for 0.10.0 brokers
- and that Streams times-out for pre-0.10.0 brokers
+ These tests validates that
+ - Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1
+ - Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1
+ - Streams fails fast for 0.10.0 brokers
+ - Streams times-out for pre-0.10.0 brokers
"""
input = "brokerCompatibilitySourceTopic"
@@ -36,7 +38,6 @@ class StreamsBrokerCompatibility(Test):
def __init__(self, test_context):
super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)
-
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
@@ -45,9 +46,6 @@ class StreamsBrokerCompatibility(Test):
self.input: {'partitions': 1, 'replication-factor': 1},
self.output: {'partitions': 1, 'replication-factor': 1}
})
-
- self.processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka)
-
self.consumer = VerifiableConsumer(test_context,
1,
self.kafka,
@@ -57,16 +55,35 @@ class StreamsBrokerCompatibility(Test):
def setUp(self):
self.zk.start()
- @parametrize(broker_version=str(DEV_BRANCH))
+ @parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
- def test_compatible_brokers(self, broker_version):
+ def test_fail_fast_on_incompatible_brokers_if_eos_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
+ processor.start()
+
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
+ monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.',
+ timeout_sec=60,
+ err_msg="Never saw 'EOS requires broker version 0.11+' error message " + str(processor.node.account))
+
+ self.kafka.stop()
+
+ @parametrize(broker_version=str(LATEST_0_10_2))
+ @parametrize(broker_version=str(LATEST_0_10_1))
+ def test_compatible_brokers_eos_disabled(self, broker_version):
+ self.kafka.set_version(KafkaVersion(broker_version))
+ self.kafka.start()
+
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
+
self.consumer.start()
- self.processor.wait()
+ processor.wait()
wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
@@ -78,13 +95,14 @@ class StreamsBrokerCompatibility(Test):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
- self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
- with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Kafka Streams requires broker version 0.10.1.x or higher.',
timeout_sec=60,
- err_msg="Never saw 'incompatible broker' error message " + str(self.processor.node.account))
+ err_msg="Never saw 'Streams requires broker verion 0.10.1+' error message " + str(processor.node.account))
self.kafka.stop()
@@ -94,12 +112,13 @@ class StreamsBrokerCompatibility(Test):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
- self.processor.start()
+ processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+ processor.start()
- self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
- with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+ processor.node.account.ssh(processor.start_cmd(processor.node))
+ with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
timeout_sec=60,
- err_msg="Never saw 'no available broker' error message " + str(self.processor.node.account))
+ err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))
self.kafka.stop()
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7cd489d..8e1497c 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -82,4 +82,9 @@ V_0_10_1_0 = KafkaVersion("0.10.1.0")
V_0_10_1_1 = KafkaVersion("0.10.1.1")
LATEST_0_10_1 = V_0_10_1_1
-LATEST_0_10 = LATEST_0_10_1
+# 0.10.2.x versions
+V_0_10_2_0 = KafkaVersion("0.10.2.0")
+V_0_10_2_1 = KafkaVersion("0.10.2.1")
+LATEST_0_10_2 = V_0_10_2_1
+
+LATEST_0_10 = LATEST_0_10_2
http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 28dcf69..100891b 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -91,6 +91,8 @@ get_kafka 0.10.0.1 2.11
chmod a+rw /opt/kafka-0.10.0.1
get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
+get_kafka 0.10.2.1
+chmod a+rw /opt/kafka-0.10.2.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local