You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/07/10 19:21:08 UTC

[kafka] branch 2.5 updated: KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new edce73f  KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)
edce73f is described below

commit edce73fedb92feb35d3b3521e01e77f2a2f07e93
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Wed Jul 8 01:48:36 2020 +0800

    KAFKA-10191 fix flaky StreamsOptimizedTest (#8913)
    
    Call KafkaStreams#cleanUp to reset local state before starting application up the second run.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Boyang Chen <bo...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../main/scala/kafka/tools/StreamsResetter.java    |  1 -
 .../kafka/streams/tests/StreamsOptimizedTest.java  |  1 +
 tests/kafkatest/services/streams.py                | 45 +++++++++++++++++++++-
 .../tests/streams/streams_optimized_test.py        |  9 +++++
 4 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 574e9c6..415069e 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -162,7 +162,6 @@ public class StreamsResetter {
             consumerConfig.putAll(properties);
             exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
             maybeDeleteInternalTopics(adminClient, dryRun);
-
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
index afec99d..f110e57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -130,6 +130,7 @@ public class StreamsOptimizedTest {
             }
         });
 
+        streams.cleanUp();
         streams.start();
 
         Exit.addShutdownHook("streams-shutdown-hook", () -> {
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3767d85..c8e0732 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -210,6 +210,10 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     def node(self):
         return self.nodes[0]
 
+    @property
+    def expectedMessage(self):
+        return 'StreamsTest instance started'
+
     def pids(self, node):
         try:
             return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
@@ -293,7 +297,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
+            monitor.wait_until(self.expectedMessage, timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -436,6 +440,45 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
                                                         "org.apache.kafka.streams.tests.StreamsStandByReplicaTest",
                                                         configs)
 
+class StreamsResetter(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, topic, applicationId):
+        super(StreamsResetter, self).__init__(test_context,
+                                              kafka,
+                                              "kafka.tools.StreamsResetter",
+                                              "")
+        self.topic = topic
+        self.applicationId = applicationId
+
+    @property
+    def expectedMessage(self):
+        return 'Done.'
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['bootstrap.servers'] = self.kafka.bootstrap_servers()
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['application.id'] = self.applicationId
+        args['input.topics'] = self.topic
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "(export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "%(kafka_run_class)s %(streams_class_name)s " \
+              "--bootstrap-servers %(bootstrap.servers)s " \
+              "--force " \
+              "--application-id %(application.id)s " \
+              "--input-topics %(input.topics)s " \
+              "& echo $! >&3 ) " \
+              "1>> %(stdout)s " \
+              "2>> %(stderr)s " \
+              "3> %(pidfile)s "% args
+
+        self.logger.info("Executing: " + cmd)
+
+        return cmd
+
 
 class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
     def __init__(self, test_context, kafka):
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index ecd84c2..adea2ea 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -17,6 +17,7 @@ import time
 from ducktape.tests.test import Test
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
+from kafkatest.services.streams import StreamsResetter
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import stop_processors
@@ -77,6 +78,8 @@ class StreamsOptimizedTest(Test):
 
         stop_processors(processors, self.stopped_message)
 
+        self.reset_application()
+
         # start again with topology optimized
         for processor in processors:
             processor.OPTIMIZED_CONFIG = 'all'
@@ -90,6 +93,12 @@ class StreamsOptimizedTest(Test):
         self.kafka.stop()
         self.zookeeper.stop()
 
+    def reset_application(self):
+        resetter = StreamsResetter(self.test_context, self.kafka, topic = self.input_topic, applicationId = 'StreamsOptimizedTest')
+        resetter.start()
+        # resetter is not long-term running but it would be better to check the pid by stopping it
+        resetter.stop()
+
     @staticmethod
     def verify_running_repartition_topic_count(processor, repartition_topic_count):
         node = processor.node