You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/16 21:16:27 UTC

[kafka] branch 1.0 updated: MINOR: Fix Streams EOS system tests (#4572)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new e00019f  MINOR: Fix Streams EOS system tests (#4572)
e00019f is described below

commit e00019f85d11c63ca5f83988ca0d7300710886af
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Feb 16 13:13:13 2018 -0800

    MINOR: Fix Streams EOS system tests (#4572)
    
    Avoid loosing log/stdout/stderr files on restart
    Reenables tests
    
    Author: Matthias J. Sax <ma...@confluent.io>
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 tests/kafkatest/services/streams.py               |  8 ++++++--
 tests/kafkatest/tests/streams/streams_eos_test.py | 10 ++++------
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3719feb..f3f7348 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -154,12 +154,18 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
 class StreamsEosTestBaseService(StreamsTestBaseService):
     """Base class for Streams EOS Test services providing some common settings and functionality"""
 
+    clean_node_enabled = True
+
     def __init__(self, test_context, kafka, command):
         super(StreamsEosTestBaseService, self).__init__(test_context,
                                                         kafka,
                                                         "org.apache.kafka.streams.tests.StreamsEosTest",
                                                         command)
 
+    def clean_node(self, node):
+        if self.clean_node_enabled:
+            super.clean_node(self, node)
+
 
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
@@ -180,12 +186,10 @@ class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
 
-
 class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
 
-
 class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 27002e9..d6ac600 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -20,7 +20,6 @@ from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
     StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
 
-
 class StreamsEosTest(KafkaTest):
     """
     Test of Kafka Streams exactly-once semantics
@@ -39,7 +38,6 @@ class StreamsEosTest(KafkaTest):
         self.driver = StreamsEosTestDriverService(test_context, self.kafka)
         self.test_context = test_context
 
-    @ignore
     @cluster(num_nodes=9)
     def test_rebalance_simple(self):
         self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@@ -47,7 +45,6 @@ class StreamsEosTest(KafkaTest):
                            StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                            StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    @ignore
     @cluster(num_nodes=9)
     def test_rebalance_complex(self):
         self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@@ -63,6 +60,8 @@ class StreamsEosTest(KafkaTest):
 
         self.driver.start()
 
+        processor1.clean_node_enabled = False
+
         self.add_streams(processor1)
         self.add_streams2(processor1, processor2)
         self.add_streams3(processor1, processor2, processor3)
@@ -79,7 +78,6 @@ class StreamsEosTest(KafkaTest):
 
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
-    @ignore
     @cluster(num_nodes=9)
     def test_failure_and_recovery(self):
         self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@@ -87,7 +85,6 @@ class StreamsEosTest(KafkaTest):
                                       StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                                       StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    @ignore
     @cluster(num_nodes=9)
     def test_failure_and_recovery_complex(self):
         self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@@ -103,6 +100,8 @@ class StreamsEosTest(KafkaTest):
 
         self.driver.start()
 
+        processor1.clean_node_enabled = False
+
         self.add_streams(processor1)
         self.add_streams2(processor1, processor2)
         self.add_streams3(processor1, processor2, processor3)
@@ -159,7 +158,6 @@ class StreamsEosTest(KafkaTest):
         self.wait_for_startup(monitor1, keep_alive_processor1)
 
     def wait_for_startup(self, monitor, processor):
-        self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
         self.wait_for(monitor, processor, "processed 500 records from topic=data")
 

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.