You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/16 21:16:27 UTC
[kafka] branch 1.0 updated: MINOR: Fix Streams EOS system tests
(#4572)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new e00019f MINOR: Fix Streams EOS system tests (#4572)
e00019f is described below
commit e00019f85d11c63ca5f83988ca0d7300710886af
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Feb 16 13:13:13 2018 -0800
MINOR: Fix Streams EOS system tests (#4572)
Avoid loosing log/stdout/stderr files on restart
Reenables tests
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
tests/kafkatest/services/streams.py | 8 ++++++--
tests/kafkatest/tests/streams/streams_eos_test.py | 10 ++++------
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 3719feb..f3f7348 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -154,12 +154,18 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings and functionality"""
+ clean_node_enabled = True
+
def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
+ def clean_node(self, node):
+ if self.clean_node_enabled:
+ super.clean_node(self, node)
+
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
@@ -180,12 +186,10 @@ class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
-
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
-
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 27002e9..d6ac600 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -20,7 +20,6 @@ from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
-
class StreamsEosTest(KafkaTest):
"""
Test of Kafka Streams exactly-once semantics
@@ -39,7 +38,6 @@ class StreamsEosTest(KafkaTest):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
- @ignore
@cluster(num_nodes=9)
def test_rebalance_simple(self):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@@ -47,7 +45,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
@cluster(num_nodes=9)
def test_rebalance_complex(self):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@@ -63,6 +60,8 @@ class StreamsEosTest(KafkaTest):
self.driver.start()
+ processor1.clean_node_enabled = False
+
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
@@ -79,7 +78,6 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
- @ignore
@cluster(num_nodes=9)
def test_failure_and_recovery(self):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@@ -87,7 +85,6 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
@cluster(num_nodes=9)
def test_failure_and_recovery_complex(self):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@@ -103,6 +100,8 @@ class StreamsEosTest(KafkaTest):
self.driver.start()
+ processor1.clean_node_enabled = False
+
self.add_streams(processor1)
self.add_streams2(processor1, processor2)
self.add_streams3(processor1, processor2, processor3)
@@ -159,7 +158,6 @@ class StreamsEosTest(KafkaTest):
self.wait_for_startup(monitor1, keep_alive_processor1)
def wait_for_startup(self, monitor, processor):
- self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
self.wait_for(monitor, processor, "processed 500 records from topic=data")
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.