You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/21 23:57:57 UTC

[kafka] branch 1.1 updated: MINOR: Fix Streams-Broker-Compatibility system test (#4594)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new b3a9307  MINOR: Fix Streams-Broker-Compatibility system test (#4594)
b3a9307 is described below

commit b3a93073ccb66a711644a62d097619121005b511
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Feb 21 15:53:46 2018 -0800

    MINOR: Fix Streams-Broker-Compatibility system test (#4594)
    
    fixes error message handling for test consumer client and KafkaStreams instance
    updates expected error message
    fixes race condition in system test code and avoids starting Streams processor twice
    
    Author: Matthias J. Sax <ma...@confluent.io.>
    
    Reviewer: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/tests/BrokerCompatibilityTest.java     | 48 +++++++++++++---------
 .../streams/streams_broker_compatibility_test.py   | 17 ++++----
 2 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index b308782..af76304 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.test.TestUtils;
@@ -93,7 +94,13 @@ public class BrokerCompatibilityTest {
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
-                System.err.println("FATAL: An unexpected exception " + e);
+                Throwable cause = e;
+                if (cause instanceof StreamsException) {
+                    while (cause.getCause() != null) {
+                        cause = cause.getCause();
+                    }
+                }
+                System.err.println("FATAL: An unexpected exception " + cause);
                 e.printStackTrace(System.err);
                 System.err.flush();
                 streams.close(30, TimeUnit.SECONDS);
@@ -109,17 +116,20 @@ public class BrokerCompatibilityTest {
         producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
-        final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
-        producer.send(new ProducerRecord<>(SOURCE_TOPIC, "key", "value"));
-
-
-        System.out.println("wait for result");
-        loopUntilRecordReceived(kafka, eosEnabled);
+        try {
+            try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);) {
+                producer.send(new ProducerRecord<>(SOURCE_TOPIC, "key", "value"));
 
-
-        System.out.println("close Kafka Streams");
-        producer.close();
-        streams.close();
+                System.out.println("wait for result");
+                loopUntilRecordReceived(kafka, eosEnabled);
+                System.out.println("close Kafka Streams");
+                streams.close();
+            }
+        } catch (final RuntimeException e) {
+            System.err.println("Non-Streams exception occurred: ");
+            e.printStackTrace(System.err);
+            System.err.flush();
+        }
     }
 
     private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
@@ -133,15 +143,15 @@ public class BrokerCompatibilityTest {
             consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
         }
 
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
-        consumer.subscribe(Collections.singletonList(SINK_TOPIC));
+        try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
+            consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
-        while (true) {
-            final ConsumerRecords<String, String> records = consumer.poll(100);
-            for (final ConsumerRecord<String, String> record : records) {
-                if (record.key().equals("key") && record.value().equals("1")) {
-                    consumer.close();
-                    return;
+            while (true) {
+                final ConsumerRecords<String, String> records = consumer.poll(100);
+                for (final ConsumerRecord<String, String> record : records) {
+                    if (record.key().equals("key") && record.value().equals("1")) {
+                        return;
+                    }
                 }
             }
         }
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index b00b9bb..5370a39 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -63,13 +63,12 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.start()
 
         processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
-        processor.start()
 
-        processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
-            monitor.wait_until("Exception in thread \"main\" org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS ",
+            processor.start()
+            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.',
                                timeout_sec=60,
-                               err_msg="Exception in thread \"main\" org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS " + str(processor.node.account))
+                               err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.' error message " + str(processor.node.account))
 
         self.kafka.stop()
 
@@ -98,13 +97,12 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.start()
 
         processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
-        processor.start()
 
-        processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
-            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.',
+            processor.start()
+            monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_TOPICS',
                         timeout_sec=60,
-                        err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.' error message " + str(processor.node.account))
+                        err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_TOPICS' error message " + str(processor.node.account))
 
         self.kafka.stop()
 
@@ -116,10 +114,9 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.start()
 
         processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
-        processor.start()
 
-        processor.node.account.ssh(processor.start_cmd(processor.node))
         with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
+            processor.start()
             monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
                                timeout_sec=60,
                                err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.