You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/07 00:48:37 UTC
kafka git commit: KAFKA-5362; Follow up to Streams EOS system test
Repository: kafka
Updated Branches:
refs/heads/trunk 91517e8fb -> 51063441d
KAFKA-5362; Follow up to Streams EOS system test
- improve tests to get rid of calls to `sleep` in Python
- fixed some flaky test conditions
- improve debugging
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3542 from mjsax/failing-eos-system-tests
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51063441
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51063441
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51063441
Branch: refs/heads/trunk
Commit: 51063441d3ed4dec8a96794f085028b4b8feb20c
Parents: 91517e8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 6 17:48:34 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 17:48:34 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/tests/EosTestClient.java | 19 ++-
.../kafka/streams/tests/EosTestDriver.java | 77 +++++++++--
.../kafka/streams/tests/SmokeTestUtil.java | 1 +
.../kafka/streams/tests/StreamsEosTest.java | 3 +
tests/kafkatest/services/streams.py | 2 +-
.../kafkatest/tests/streams/streams_eos_test.py | 133 ++++++++++++-------
6 files changed, 166 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 751fc97..098b77b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -53,10 +53,12 @@ public class EosTestClient extends SmokeTestUtil {
@Override
public void run() {
isRunning = false;
- streams.close(5, TimeUnit.SECONDS);
+ streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
+ System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-CLOSED");
+ System.out.flush();
}
}
}));
@@ -69,15 +71,26 @@ public class EosTestClient extends SmokeTestUtil {
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
+ System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-EXCEPTION");
e.printStackTrace();
+ System.out.flush();
uncaughtException = true;
}
});
+ streams.setStateListener(new KafkaStreams.StateListener() {
+ @Override
+ public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
+ // don't remove this -- it's required test output
+ System.out.println(System.currentTimeMillis());
+ System.out.println("StateChange: " + oldState + " -> " + newState);
+ System.out.flush();
+ }
+ });
streams.start();
}
if (uncaughtException) {
- streams.close(5, TimeUnit.SECONDS);
+ streams.close(TimeUnit.SECONDS.toMillis(60), TimeUnit.SECONDS);
streams = null;
}
sleep(1000);
@@ -90,7 +103,7 @@ public class EosTestClient extends SmokeTestUtil {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 0c1e16b..7c7485d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -54,15 +55,23 @@ import java.util.Set;
public class EosTestDriver extends SmokeTestUtil {
private static final int MAX_NUMBER_OF_KEYS = 100;
- private static final long MAX_IDLE_TIME_MS = 300000L;
+ private static final long MAX_IDLE_TIME_MS = 600000L;
private static boolean isRunning = true;
+ static int numRecordsProduced = 0;
+
+ static synchronized void updateNumRecordsProduces(final int delta) {
+ numRecordsProduced += delta;
+ }
+
static void generate(final String kafka) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
+ System.out.println("Terminating");
+ System.out.flush();
isRunning = false;
}
});
@@ -78,7 +87,6 @@ public class EosTestDriver extends SmokeTestUtil {
final Random rand = new Random(System.currentTimeMillis());
- int numRecordsProduced = 0;
while (isRunning) {
final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
final int value = rand.nextInt(10000);
@@ -89,20 +97,47 @@ public class EosTestDriver extends SmokeTestUtil {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
- exception.printStackTrace();
- Exit.exit(1);
+ exception.printStackTrace(System.err);
+ System.err.flush();
+ if (exception instanceof TimeoutException) {
+ try {
+ // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+ final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+ updateNumRecordsProduces(-expired);
+ } catch (Exception ignore) { }
+ }
}
}
});
- numRecordsProduced++;
+ updateNumRecordsProduces(1);
if (numRecordsProduced % 1000 == 0) {
System.out.println(numRecordsProduced + " records produced");
+ System.out.flush();
}
- Utils.sleep(rand.nextInt(50));
+ Utils.sleep(rand.nextInt(10));
}
producer.close();
- System.out.println(numRecordsProduced + " records produced");
+ System.out.println("Producer closed: " + numRecordsProduced + " records produced");
+
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ final List<TopicPartition> partitions = getAllPartitions(consumer, "data");
+ System.out.println("Partitions: " + partitions);
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+
+ for (final TopicPartition tp : partitions) {
+ System.out.println("End-offset for " + tp + " is " + consumer.position(tp));
+ }
+ }
+ System.out.flush();
}
public static void verify(final String kafka, final boolean withRepartitioning) {
@@ -180,6 +215,7 @@ public class EosTestDriver extends SmokeTestUtil {
// do not modify: required test output
System.out.println("ALL-RECORDS-DELIVERED");
+ System.out.flush();
}
private static void ensureStreamsApplicationDown(final String kafka) {
@@ -190,7 +226,7 @@ public class EosTestDriver extends SmokeTestUtil {
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
if (System.currentTimeMillis() > maxWaitTime) {
- throw new RuntimeException("Streams application not down after 30 seconds.");
+ throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
}
sleep(1000);
}
@@ -240,16 +276,20 @@ public class EosTestDriver extends SmokeTestUtil {
final Map<TopicPartition, Long> readEndOffsets,
final boolean withRepartitioning,
final boolean isInputTopic) {
+ System.err.println("read end offset: " + readEndOffsets);
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
+ final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<>();
+ final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<>();
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
boolean allRecordsReceived = false;
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
- final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
+ final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ maxReceivedOffsetPerPartition.put(tp, record.offset());
final long readEndOffset = readEndOffsets.get(tp);
if (record.offset() < readEndOffset) {
addRecord(record, recordPerTopicPerPartition, withRepartitioning);
@@ -257,7 +297,11 @@ public class EosTestDriver extends SmokeTestUtil {
throw new RuntimeException("FAIL: did receive more records than expected for " + tp
+ " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
}
- if (consumer.position(tp) >= readEndOffset) {
+ }
+
+ for (final TopicPartition tp : readEndOffsets.keySet()) {
+ maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
+ if (consumer.position(tp) >= readEndOffsets.get(tp)) {
consumer.pause(Collections.singletonList(tp));
}
}
@@ -266,7 +310,10 @@ public class EosTestDriver extends SmokeTestUtil {
}
if (!allRecordsReceived) {
- throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
+ System.err.println("Pause partitions (ie, received all data): " + consumer.paused());
+ System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition);
+ System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition);
+ throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000) + " sec idle time.");
}
return recordPerTopicPerPartition;
@@ -530,7 +577,8 @@ public class EosTestDriver extends SmokeTestUtil {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
- exception.printStackTrace();
+ exception.printStackTrace(System.err);
+ System.err.flush();
Exit.exit(1);
}
}
@@ -540,10 +588,11 @@ public class EosTestDriver extends SmokeTestUtil {
final StringDeserializer stringDeserializer = new StringDeserializer();
- final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (final ConsumerRecord<byte[], byte[]> record : records) {
+ maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
final String topic = record.topic();
final TopicPartition tp = new TopicPartition(topic, record.partition());
@@ -564,7 +613,7 @@ public class EosTestDriver extends SmokeTestUtil {
}
}
if (!partitions.isEmpty()) {
- throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
+ throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec.");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index fc808e7..dc4c91b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -60,6 +60,7 @@ public class SmokeTestUtil {
}
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
+ System.out.println(System.currentTimeMillis());
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index 27fdc2d..4921143 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -33,6 +33,7 @@ public class StreamsEosTest {
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("command=" + command);
+ System.out.flush();
if (command == null || stateDir == null) {
System.exit(-1);
@@ -56,6 +57,8 @@ public class StreamsEosTest {
break;
default:
System.out.println("unknown command: " + command);
+ System.out.flush();
+ System.exit(-1);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a0d9c57..3719feb 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -135,7 +135,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
self.logger.info("Starting StreamsTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
- monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
+ monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 60dc4b1..0863e25 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -19,7 +19,6 @@ from ducktape.mark import ignore
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
-import time
class StreamsEosTest(KafkaTest):
@@ -29,106 +28,138 @@ class StreamsEosTest(KafkaTest):
def __init__(self, test_context):
super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
- 'data' : { 'partitions': 5, 'replication-factor': 2 },
- 'echo' : { 'partitions': 5, 'replication-factor': 2 },
- 'min' : { 'partitions': 5, 'replication-factor': 2 },
- 'sum' : { 'partitions': 5, 'replication-factor': 2 },
- 'repartition' : { 'partitions': 5, 'replication-factor': 2 },
- 'max' : { 'partitions': 5, 'replication-factor': 2 },
- 'cnt' : { 'partitions': 5, 'replication-factor': 2 }
+ 'data': {'partitions': 5, 'replication-factor': 2},
+ 'echo': {'partitions': 5, 'replication-factor': 2},
+ 'min': {'partitions': 5, 'replication-factor': 2},
+ 'sum': {'partitions': 5, 'replication-factor': 2},
+ 'repartition': {'partitions': 5, 'replication-factor': 2},
+ 'max': {'partitions': 5, 'replication-factor': 2},
+ 'cnt': {'partitions': 5, 'replication-factor': 2}
})
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
- @ignore
- @cluster(num_nodes=8)
+ @cluster(num_nodes=9)
def test_rebalance_simple(self):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
- @cluster(num_nodes=8)
+ @cluster(num_nodes=9)
def test_rebalance_complex(self):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
- def run_rebalance(self, processor1, processor2, verifier):
+ def run_rebalance(self, processor1, processor2, processor3, verifier):
"""
Starts and stops two test clients a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
- processor1.start()
-
- time.sleep(120)
-
- processor2.start()
- time.sleep(120)
- processor1.stop()
-
- time.sleep(120)
- processor1.start()
-
- time.sleep(120)
- processor2.stop()
-
- time.sleep(120)
+ self.add_streams(processor1)
+ self.add_streams2(processor1, processor2)
+ self.add_streams3(processor1, processor2, processor3)
+ self.stop_streams3(processor2, processor3, processor1)
+ self.add_streams3(processor2, processor3, processor1)
+ self.stop_streams3(processor1, processor3, processor2)
+ self.stop_streams2(processor1, processor3)
+ self.stop_streams(processor1)
self.driver.stop()
- processor1.stop()
- processor2.stop()
-
verifier.start()
verifier.wait()
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
- @ignore
- @cluster(num_nodes=8)
+ @cluster(num_nodes=9)
def test_failure_and_recovery(self):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
- @ignore
- @cluster(num_nodes=8)
+ @cluster(num_nodes=9)
def test_failure_and_recovery_complex(self):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
- def run_failure_and_recovery(self, processor1, processor2, verifier):
+ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
"""
Starts two test clients, then abort (kill -9) and restart them a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
- processor1.start()
- processor2.start()
-
- time.sleep(120)
- processor1.abortThenRestart()
-
- time.sleep(120)
- processor1.abortThenRestart()
- time.sleep(120)
- processor2.abortThenRestart()
-
- time.sleep(120)
+ self.add_streams(processor1)
+ self.add_streams2(processor1, processor2)
+ self.add_streams3(processor1, processor2, processor3)
+ self.abort_streams(processor2, processor3, processor1)
+ self.add_streams3(processor2, processor3, processor1)
+ self.abort_streams(processor2, processor3, processor1)
+ self.add_streams3(processor2, processor3, processor1)
+ self.abort_streams(processor1, processor3, processor2)
+ self.stop_streams2(processor1, processor3)
+ self.stop_streams(processor1)
self.driver.stop()
- processor1.stop()
- processor2.stop()
-
verifier.start()
verifier.wait()
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
+
+ def add_streams(self, processor):
+ processor.start()
+ with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ self.wait_for_startup(monitor, processor)
+
+ def add_streams2(self, running_processor, processor_to_be_started):
+ with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
+ self.add_streams(processor_to_be_started)
+ self.wait_for_startup(monitor, running_processor)
+
+ def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
+ with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
+ self.add_streams2(running_processor2, processor_to_be_started)
+ self.wait_for_startup(monitor, running_processor1)
+
+ def stop_streams(self, processor_to_be_stopped):
+ with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
+ processor_to_be_stopped.stop()
+ self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
+
+ def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
+ with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
+ self.stop_streams(processor_to_be_stopped)
+ self.wait_for_startup(monitor, keep_alive_processor)
+
+ def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
+ with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
+ self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+ self.wait_for_startup(monitor, keep_alive_processor1)
+
+ def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
+ with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
+ with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
+ processor_to_be_aborted.stop_nodes(False)
+ self.wait_for_startup(monitor2, keep_alive_processor2)
+ self.wait_for_startup(monitor1, keep_alive_processor1)
+
+ def wait_for_startup(self, monitor, processor):
+ self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
+ self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
+ self.wait_for(monitor, processor, "processed 500 records from topic=data")
+
+ def wait_for(self, monitor, processor, output):
+ monitor.wait_until(output,
+ timeout_sec=300,
+ err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))