You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/22 05:16:22 UTC

kafka git commit: KAFKA-4923: Modify compatibility test for Exaclty-Once Semantics in Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk dc10b0ea0 -> 495836a4a


KAFKA-4923: Modify compatibility test for Exaclty-Once Semantics in Streams

 - add broker compatibility system tests

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #2974 from mjsax/kafka-4923-add-eos-to-streams-add-broker-check-and-system-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/495836a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/495836a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/495836a4

Branch: refs/heads/trunk
Commit: 495836a4a27c2219f72a5f3776ba8e5216493b41
Parents: dc10b0e
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun May 21 22:16:18 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun May 21 22:16:18 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |  2 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |  4 +-
 .../processor/internals/StreamsKafkaClient.java | 19 ++++++-
 .../streams/tests/BrokerCompatibilityTest.java  | 21 +++++--
 tests/kafkatest/services/streams.py             |  4 +-
 .../streams_broker_compatibility_test.py        | 59 +++++++++++++-------
 tests/kafkatest/version.py                      |  7 ++-
 vagrant/base.sh                                 |  2 +
 8 files changed, 86 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index cf57a50..743c68d 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -109,7 +109,7 @@
     </module>
     <module name="BooleanExpressionComplexity">
       <!-- default is 3 -->
-      <property name="max" value="4"/>
+      <property name="max" value="5"/>
     </module>
 
     <module name="ClassFanOutComplexity">

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 74f5fc1..3b801a9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -73,6 +73,8 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
 
 /**
  * A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -409,7 +411,7 @@ public class KafkaStreams {
     private void checkBrokerVersionCompatibility() throws StreamsException {
         final StreamsKafkaClient client = new StreamsKafkaClient(config);
 
-        client.checkBrokerCompatibility();
+        client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
 
         try {
             client.close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index c2b76f9..44f7900 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -61,6 +61,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
+
 public class StreamsKafkaClient {
 
     private static final ConfigDef CONFIG = StreamsConfig.configDef()
@@ -310,7 +313,7 @@ public class StreamsKafkaClient {
      *
      * @throws StreamsException if brokers have version 0.10.0.x
      */
-    public void checkBrokerCompatibility() throws StreamsException {
+    public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
             getAnyReadyBrokerId(),
             new ApiVersionsRequest.Builder(),
@@ -331,5 +334,19 @@ public class StreamsKafkaClient {
         if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
             throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
         }
+
+        if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) {
+            throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher.");
+        }
     }
+
+    private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) {
+        return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null
+            && apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null
+            && apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null
+            && apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null
+            && apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null
+            && apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c04b3d1..0af2594 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -33,6 +34,7 @@ import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -41,11 +43,12 @@ public class BrokerCompatibilityTest {
     private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
     private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
 
-    public static void main(String[] args) throws Exception {
+    public static void main(final String[] args) throws Exception {
         System.out.println("StreamsTest instance started");
 
         final String kafka = args.length > 0 ? args[0] : "localhost:9092";
         final String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
+        final boolean eosEnabled = args.length > 2 ? Boolean.parseBoolean(args[2]) : false;
 
         final File stateDir = new File(stateDirStr);
         stateDir.mkdir();
@@ -58,6 +61,9 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        if (eosEnabled) {
+            streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        }
         final int timeout = 6000;
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout);
         streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), timeout);
@@ -70,7 +76,7 @@ public class BrokerCompatibilityTest {
         final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
-            public void uncaughtException(Thread t, Throwable e) {
+            public void uncaughtException(final Thread t, final Throwable e) {
                 System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
 
                 streams.close(30, TimeUnit.SECONDS);
@@ -91,27 +97,30 @@ public class BrokerCompatibilityTest {
 
 
         System.out.println("wait for result");
-        loopUntilRecordReceived(kafka);
+        loopUntilRecordReceived(kafka, eosEnabled);
 
 
         System.out.println("close Kafka Streams");
         streams.close();
     }
 
-    private static void loopUntilRecordReceived(final String kafka) {
+    private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
         final Properties consumerProperties = new Properties();
         consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
         consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        if (eosEnabled) {
+            consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+        }
 
         final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
         consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
         while (true) {
-            ConsumerRecords<String, String> records = consumer.poll(100);
-            for (ConsumerRecord<String, String> record : records) {
+            final ConsumerRecords<String, String> records = consumer.poll(100);
+            for (final ConsumerRecord<String, String> record : records) {
                 if (record.key().equals("key") && record.value().equals("value")) {
                     consumer.close();
                     return;

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index e7be947..905320a 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -166,8 +166,8 @@ class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
 
 
 class StreamsBrokerCompatibilityService(StreamsTestBaseService):
-    def __init__(self, test_context, kafka):
+    def __init__(self, test_context, kafka, eosEnabled):
         super(StreamsBrokerCompatibilityService, self).__init__(test_context,
                                                                 kafka,
                                                                 "org.apache.kafka.streams.tests.BrokerCompatibilityTest",
-                                                                "dummy")
+                                                                eosEnabled)

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index a5bdbc6..c4b554e 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -21,14 +21,16 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsBrokerCompatibilityService
 from kafkatest.services.verifiable_consumer import VerifiableConsumer
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import DEV_BRANCH, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, KafkaVersion
 
 
 class StreamsBrokerCompatibility(Test):
     """
-    These tests validate that Streams v0.10.2+ can connect to older brokers v0.10.1+
-    and that Streams fails fast for 0.10.0 brokers
-    and that Streams times-out for pre-0.10.0 brokers
+    These tests validates that
+    - Streams 0.11+ w/ EOS fails fast for older brokers 0.10.2 and 0.10.1
+    - Streams 0.11+ w/o EOS works for older brokers 0.10.2 and 0.10.1
+    - Streams fails fast for 0.10.0 brokers
+    - Streams times-out for pre-0.10.0 brokers
     """
 
     input = "brokerCompatibilitySourceTopic"
@@ -36,7 +38,6 @@ class StreamsBrokerCompatibility(Test):
 
     def __init__(self, test_context):
         super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)
-
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context,
                                   num_nodes=1,
@@ -45,9 +46,6 @@ class StreamsBrokerCompatibility(Test):
                                       self.input: {'partitions': 1, 'replication-factor': 1},
                                       self.output: {'partitions': 1, 'replication-factor': 1}
                                   })
-
-        self.processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka)
-
         self.consumer = VerifiableConsumer(test_context,
                                            1,
                                            self.kafka,
@@ -57,16 +55,35 @@ class StreamsBrokerCompatibility(Test):
     def setUp(self):
         self.zk.start()
 
-    @parametrize(broker_version=str(DEV_BRANCH))
+    @parametrize(broker_version=str(LATEST_0_10_2))
     @parametrize(broker_version=str(LATEST_0_10_1))
-    def test_compatible_brokers(self, broker_version):
+    def test_fail_fast_on_incompatible_brokers_if_eos_enabled(self, broker_version):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
-        self.processor.start()
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
+        processor.start()
+
+        processor.node.account.ssh(processor.start_cmd(processor.node))
+        with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
+            monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.',
+                               timeout_sec=60,
+                               err_msg="Never saw 'EOS requires broker version 0.11+' error message " + str(processor.node.account))
+
+        self.kafka.stop()
+
+    @parametrize(broker_version=str(LATEST_0_10_2))
+    @parametrize(broker_version=str(LATEST_0_10_1))
+    def test_compatible_brokers_eos_disabled(self, broker_version):
+        self.kafka.set_version(KafkaVersion(broker_version))
+        self.kafka.start()
+
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+        processor.start()
+
         self.consumer.start()
 
-        self.processor.wait()
+        processor.wait()
 
         wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
 
@@ -78,13 +95,14 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
-        self.processor.start()
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+        processor.start()
 
-        self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
-        with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+        processor.node.account.ssh(processor.start_cmd(processor.node))
+        with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
             monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: Kafka Streams requires broker version 0.10.1.x or higher.',
                         timeout_sec=60,
-                        err_msg="Never saw 'incompatible broker' error message " + str(self.processor.node.account))
+                        err_msg="Never saw 'Streams requires broker verion 0.10.1+' error message " + str(processor.node.account))
 
         self.kafka.stop()
 
@@ -94,12 +112,13 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.set_version(KafkaVersion(broker_version))
         self.kafka.start()
 
-        self.processor.start()
+        processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
+        processor.start()
 
-        self.processor.node.account.ssh(self.processor.start_cmd(self.processor.node))
-        with self.processor.node.account.monitor_log(self.processor.STDERR_FILE) as monitor:
+        processor.node.account.ssh(processor.start_cmd(processor.node))
+        with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
             monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
                                timeout_sec=60,
-                               err_msg="Never saw 'no available broker' error message " + str(self.processor.node.account))
+                               err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))
 
         self.kafka.stop()

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/tests/kafkatest/version.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7cd489d..8e1497c 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -82,4 +82,9 @@ V_0_10_1_0 = KafkaVersion("0.10.1.0")
 V_0_10_1_1 = KafkaVersion("0.10.1.1")
 LATEST_0_10_1 = V_0_10_1_1
 
-LATEST_0_10 = LATEST_0_10_1
+# 0.10.2.x versions
+V_0_10_2_0 = KafkaVersion("0.10.2.0")
+V_0_10_2_1 = KafkaVersion("0.10.2.1")
+LATEST_0_10_2 = V_0_10_2_1
+
+LATEST_0_10 = LATEST_0_10_2

http://git-wip-us.apache.org/repos/asf/kafka/blob/495836a4/vagrant/base.sh
----------------------------------------------------------------------
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 28dcf69..100891b 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -91,6 +91,8 @@ get_kafka 0.10.0.1 2.11
 chmod a+rw /opt/kafka-0.10.0.1
 get_kafka 0.10.1.1 2.11
 chmod a+rw /opt/kafka-0.10.1.1
+get_kafka 0.10.2.1
+chmod a+rw /opt/kafka-0.10.2.1
 
 
 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local