You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/21 23:57:57 UTC
[kafka] branch 1.1 updated: MINOR: Fix Streams-Broker-Compatibility
system test (#4594)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new b3a9307 MINOR: Fix Streams-Broker-Compatibility system test (#4594)
b3a9307 is described below
commit b3a93073ccb66a711644a62d097619121005b511
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Feb 21 15:53:46 2018 -0800
MINOR: Fix Streams-Broker-Compatibility system test (#4594)
fixes error message handling for test consumer client and KafkaStreams instance
updates expected error message
fixes race condition in system test code and avoids starting Streams processor twice
Author: Matthias J. Sax <ma...@confluent.io.>
Reviewer: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/tests/BrokerCompatibilityTest.java | 48 +++++++++++++---------
.../streams/streams_broker_compatibility_test.py | 17 ++++----
2 files changed, 36 insertions(+), 29 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index b308782..af76304 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.test.TestUtils;
@@ -93,7 +94,13 @@ public class BrokerCompatibilityTest {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
- System.err.println("FATAL: An unexpected exception " + e);
+ Throwable cause = e;
+ if (cause instanceof StreamsException) {
+ while (cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ }
+ System.err.println("FATAL: An unexpected exception " + cause);
e.printStackTrace(System.err);
System.err.flush();
streams.close(30, TimeUnit.SECONDS);
@@ -109,17 +116,20 @@ public class BrokerCompatibilityTest {
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);
- producer.send(new ProducerRecord<>(SOURCE_TOPIC, "key", "value"));
-
-
- System.out.println("wait for result");
- loopUntilRecordReceived(kafka, eosEnabled);
+ try {
+ try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);) {
+ producer.send(new ProducerRecord<>(SOURCE_TOPIC, "key", "value"));
-
- System.out.println("close Kafka Streams");
- producer.close();
- streams.close();
+ System.out.println("wait for result");
+ loopUntilRecordReceived(kafka, eosEnabled);
+ System.out.println("close Kafka Streams");
+ streams.close();
+ }
+ } catch (final RuntimeException e) {
+ System.err.println("Non-Streams exception occurred: ");
+ e.printStackTrace(System.err);
+ System.err.flush();
+ }
}
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
@@ -133,15 +143,15 @@ public class BrokerCompatibilityTest {
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
}
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
- consumer.subscribe(Collections.singletonList(SINK_TOPIC));
+ try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
+ consumer.subscribe(Collections.singletonList(SINK_TOPIC));
- while (true) {
- final ConsumerRecords<String, String> records = consumer.poll(100);
- for (final ConsumerRecord<String, String> record : records) {
- if (record.key().equals("key") && record.value().equals("1")) {
- consumer.close();
- return;
+ while (true) {
+ final ConsumerRecords<String, String> records = consumer.poll(100);
+ for (final ConsumerRecord<String, String> record : records) {
+ if (record.key().equals("key") && record.value().equals("1")) {
+ return;
+ }
}
}
}
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index b00b9bb..5370a39 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -63,13 +63,12 @@ class StreamsBrokerCompatibility(Test):
self.kafka.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, True)
- processor.start()
- processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
- monitor.wait_until("Exception in thread \"main\" org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS ",
+ processor.start()
+ monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.',
timeout_sec=60,
- err_msg="Exception in thread \"main\" org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS " + str(processor.node.account))
+ err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: Cannot create a v0 FindCoordinator request because we require features supported only in 1 or later.' error message " + str(processor.node.account))
self.kafka.stop()
@@ -98,13 +97,12 @@ class StreamsBrokerCompatibility(Test):
self.kafka.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
- processor.start()
- processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
- monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.',
+ processor.start()
+ monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_TOPICS',
timeout_sec=60,
- err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.streams.errors.StreamsException: Could not create topic kafka-streams-system-test-broker-compatibility-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog.' error message " + str(processor.node.account))
+ err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_TOPICS' error message " + str(processor.node.account))
self.kafka.stop()
@@ -116,10 +114,9 @@ class StreamsBrokerCompatibility(Test):
self.kafka.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, False)
- processor.start()
- processor.node.account.ssh(processor.start_cmd(processor.node))
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
+ processor.start()
monitor.wait_until('Exception in thread "main" org.apache.kafka.streams.errors.BrokerNotFoundException: Could not find any available broker.',
timeout_sec=60,
err_msg="Never saw 'no available brokers' error message " + str(processor.node.account))
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.