You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/07/10 19:21:08 UTC
[kafka] branch 2.5 updated: KAFKA-10191 fix flaky
StreamsOptimizedTest (#8913)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new edce73f KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)
edce73f is described below
commit edce73fedb92feb35d3b3521e01e77f2a2f07e93
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Wed Jul 8 01:48:36 2020 +0800
KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)
Call KafkaStreams#cleanUp to reset local state before starting application up the second run.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Boyang Chen <bo...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../main/scala/kafka/tools/StreamsResetter.java | 1 -
.../kafka/streams/tests/StreamsOptimizedTest.java | 1 +
tests/kafkatest/services/streams.py | 45 +++++++++++++++++++++-
.../tests/streams/streams_optimized_test.py | 9 +++++
4 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 574e9c6..415069e 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -162,7 +162,6 @@ public class StreamsResetter {
consumerConfig.putAll(properties);
exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
maybeDeleteInternalTopics(adminClient, dryRun);
-
} catch (final Throwable e) {
exitCode = EXIT_CODE_ERROR;
System.err.println("ERROR: " + e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index afec99d..f110e57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -130,6 +130,7 @@ public class StreamsOptimizedTest {
}
});
+ streams.cleanUp();
streams.start();
Exit.addShutdownHook("streams-shutdown-hook", () -> {
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3767d85..c8e0732 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -210,6 +210,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
def node(self):
return self.nodes[0]
+ @property
+ def expectedMessage(self):
+ return 'StreamsTest instance started'
+
def pids(self, node):
try:
return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
@@ -293,7 +297,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
- monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
+ monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
@@ -436,6 +440,45 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
"org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
configs)
+class StreamsResetter(StreamsTestBaseService):
+ def __init__(self, test_context, kafka, topic, applicationId):
+ super(StreamsResetter, self).__init__(test_context,
+ kafka,
+ "kafka.tools.StreamsResetter",
+ "")
+ self.topic = topic
+ self.applicationId = applicationId
+
+ @property
+ def expectedMessage(self):
+ return 'Done.'
+
+ def start_cmd(self, node):
+ args = self.args.copy()
+ args['bootstrap.servers'] = self.kafka.bootstrap_servers()
+ args['stdout'] = self.STDOUT_FILE
+ args['stderr'] = self.STDERR_FILE
+ args['pidfile'] = self.PID_FILE
+ args['log4j'] = self.LOG4J_CONFIG_FILE
+ args['application.id'] = self.applicationId
+ args['input.topics'] = self.topic
+ args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+ cmd = "(export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+ "%(kafka_run_class)s %(streams_class_name)s " \
+ "--bootstrap-servers %(bootstrap.servers)s " \
+ "--force " \
+ "--application-id %(application.id)s " \
+ "--input-topics %(input.topics)s " \
+ "& echo $! >&3 ) " \
+ "1>> %(stdout)s " \
+ "2>> %(stderr)s " \
+ "3> %(pidfile)s "% args
+
+ self.logger.info("Executing: " + cmd)
+
+ return cmd
+
class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index ecd84c2..adea2ea 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -17,6 +17,7 @@ import time
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
+from kafkatest.services.streams import StreamsResetter
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import stop_processors
@@ -77,6 +78,8 @@ class StreamsOptimizedTest(Test):
stop_processors(processors, self.stopped_message)
+ self.reset_application()
+
# start again with topology optimized
for processor in processors:
processor.OPTIMIZED_CONFIG = 'all'
@@ -90,6 +93,12 @@ class StreamsOptimizedTest(Test):
self.kafka.stop()
self.zookeeper.stop()
+ def reset_application(self):
+ resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
+ resetter.start()
+ # resetter is not long-term running but it would be better to check the pid by stopping it
+ resetter.stop()
+
@staticmethod
def verify_running_repartition_topic_count(processor, repartition_topic_count):
node = processor.node