You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/07 00:48:37 UTC

kafka git commit: KAFKA-5362; Follow up to Streams EOS system test

Repository: kafka
Updated Branches:
  refs/heads/trunk 91517e8fb -> 51063441d


KAFKA-5362; Follow up to Streams EOS system test

 - improve tests to get rid of calls to `sleep` in Python
 - fixed some flaky test conditions
 - improve debugging

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3542 from mjsax/failing-eos-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51063441
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51063441
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51063441

Branch: refs/heads/trunk
Commit: 51063441d3ed4dec8a96794f085028b4b8feb20c
Parents: 91517e8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Fri Oct 6 17:48:34 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 6 17:48:34 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/tests/EosTestClient.java      |  19 ++-
 .../kafka/streams/tests/EosTestDriver.java      |  77 +++++++++--
 .../kafka/streams/tests/SmokeTestUtil.java      |   1 +
 .../kafka/streams/tests/StreamsEosTest.java     |   3 +
 tests/kafkatest/services/streams.py             |   2 +-
 .../kafkatest/tests/streams/streams_eos_test.py | 133 ++++++++++++-------
 6 files changed, 166 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 751fc97..098b77b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -53,10 +53,12 @@ public class EosTestClient extends SmokeTestUtil {
             @Override
             public void run() {
                 isRunning = false;
-                streams.close(5, TimeUnit.SECONDS);
+                streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
                 // do not remove these printouts since they are needed for health scripts
                 if (!uncaughtException) {
+                    System.out.println(System.currentTimeMillis());
                     System.out.println("EOS-TEST-CLIENT-CLOSED");
+                    System.out.flush();
                 }
             }
         }));
@@ -69,15 +71,26 @@ public class EosTestClient extends SmokeTestUtil {
                 streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                     @Override
                     public void uncaughtException(final Thread t, final Throwable e) {
+                        System.out.println(System.currentTimeMillis());
                         System.out.println("EOS-TEST-CLIENT-EXCEPTION");
                         e.printStackTrace();
+                        System.out.flush();
                         uncaughtException = true;
                     }
                 });
+                streams.setStateListener(new KafkaStreams.StateListener() {
+                    @Override
+                    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
+                        // don't remove this -- it's required test output
+                        System.out.println(System.currentTimeMillis());
+                        System.out.println("StateChange: " + oldState + " -> " + newState);
+                        System.out.flush();
+                    }
+                });
                 streams.start();
             }
             if (uncaughtException) {
-                streams.close(5, TimeUnit.SECONDS);
+                streams.close(TimeUnit.SECONDS.toMillis(60), TimeUnit.SECONDS);
                 streams = null;
             }
             sleep(1000);
@@ -90,7 +103,7 @@ public class EosTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 0c1e16b..7c7485d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -54,15 +55,23 @@ import java.util.Set;
 public class EosTestDriver extends SmokeTestUtil {
 
     private static final int MAX_NUMBER_OF_KEYS = 100;
-    private static final long MAX_IDLE_TIME_MS = 300000L;
+    private static final long MAX_IDLE_TIME_MS = 600000L;
 
     private static boolean isRunning = true;
 
+    static int numRecordsProduced = 0;
+
+    static synchronized void updateNumRecordsProduces(final int delta) {
+        numRecordsProduced += delta;
+    }
+
     static void generate(final String kafka) {
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
+                System.out.println("Terminating");
+                System.out.flush();
                 isRunning = false;
             }
         });
@@ -78,7 +87,6 @@ public class EosTestDriver extends SmokeTestUtil {
 
         final Random rand = new Random(System.currentTimeMillis());
 
-        int numRecordsProduced = 0;
         while (isRunning) {
             final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
             final int value = rand.nextInt(10000);
@@ -89,20 +97,47 @@ public class EosTestDriver extends SmokeTestUtil {
                 @Override
                 public void onCompletion(final RecordMetadata metadata, final Exception exception) {
                     if (exception != null) {
-                        exception.printStackTrace();
-                        Exit.exit(1);
+                        exception.printStackTrace(System.err);
+                        System.err.flush();
+                        if (exception instanceof TimeoutException) {
+                            try {
+                                // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
+                                final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
+                                updateNumRecordsProduces(-expired);
+                            } catch (Exception ignore) { }
+                        }
                     }
                 }
             });
 
-            numRecordsProduced++;
+            updateNumRecordsProduces(1);
             if (numRecordsProduced % 1000 == 0) {
                 System.out.println(numRecordsProduced + " records produced");
+                System.out.flush();
             }
-            Utils.sleep(rand.nextInt(50));
+            Utils.sleep(rand.nextInt(10));
         }
         producer.close();
-        System.out.println(numRecordsProduced + " records produced");
+        System.out.println("Producer closed: " + numRecordsProduced + " records produced");
+
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, "data");
+            System.out.println("Partitions: " + partitions);
+            consumer.assign(partitions);
+            consumer.seekToEnd(partitions);
+
+            for (final TopicPartition tp : partitions) {
+                System.out.println("End-offset for " + tp + " is " + consumer.position(tp));
+            }
+        }
+        System.out.flush();
     }
 
     public static void verify(final String kafka, final boolean withRepartitioning) {
@@ -180,6 +215,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
         // do not modify: required test output
         System.out.println("ALL-RECORDS-DELIVERED");
+        System.out.flush();
     }
 
     private static void ensureStreamsApplicationDown(final String kafka) {
@@ -190,7 +226,7 @@ public class EosTestDriver extends SmokeTestUtil {
             final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
             while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
                 if (System.currentTimeMillis() > maxWaitTime) {
-                    throw new RuntimeException("Streams application not down after 30 seconds.");
+                    throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
                 }
                 sleep(1000);
             }
@@ -240,16 +276,20 @@ public class EosTestDriver extends SmokeTestUtil {
                                                                                                      final Map<TopicPartition, Long> readEndOffsets,
                                                                                                      final boolean withRepartitioning,
                                                                                                      final boolean isInputTopic) {
+        System.err.println("read end offset: " + readEndOffsets);
         final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
+        final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<>();
+        final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<>();
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
 
             for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
                 final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+                maxReceivedOffsetPerPartition.put(tp, record.offset());
                 final long readEndOffset = readEndOffsets.get(tp);
                 if (record.offset() < readEndOffset) {
                     addRecord(record, recordPerTopicPerPartition, withRepartitioning);
@@ -257,7 +297,11 @@ public class EosTestDriver extends SmokeTestUtil {
                     throw new RuntimeException("FAIL: did receive more records than expected for " + tp
                         + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
                 }
-                if (consumer.position(tp) >= readEndOffset) {
+            }
+
+            for (final TopicPartition tp : readEndOffsets.keySet()) {
+                maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
+                if (consumer.position(tp) >= readEndOffsets.get(tp)) {
                     consumer.pause(Collections.singletonList(tp));
                 }
             }
@@ -266,7 +310,10 @@ public class EosTestDriver extends SmokeTestUtil {
         }
 
         if (!allRecordsReceived) {
-            throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
+            System.err.println("Pause partitions (ie, received all data): " + consumer.paused());
+            System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition);
+            System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition);
+            throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000) + " sec idle time.");
         }
 
         return recordPerTopicPerPartition;
@@ -530,7 +577,8 @@ public class EosTestDriver extends SmokeTestUtil {
                     @Override
                     public void onCompletion(final RecordMetadata metadata, final Exception exception) {
                         if (exception != null) {
-                            exception.printStackTrace();
+                            exception.printStackTrace(System.err);
+                            System.err.flush();
                             Exit.exit(1);
                         }
                     }
@@ -540,10 +588,11 @@ public class EosTestDriver extends SmokeTestUtil {
 
         final StringDeserializer stringDeserializer = new StringDeserializer();
 
-        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+        long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
             final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
             for (final ConsumerRecord<byte[], byte[]> record : records) {
+                maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
                 final String topic = record.topic();
                 final TopicPartition tp = new TopicPartition(topic, record.partition());
 
@@ -564,7 +613,7 @@ public class EosTestDriver extends SmokeTestUtil {
             }
         }
         if (!partitions.isEmpty()) {
-            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
+            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index fc808e7..dc4c91b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -60,6 +60,7 @@ public class SmokeTestUtil {
                         }
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
+                            System.out.println(System.currentTimeMillis());
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index 27fdc2d..4921143 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -33,6 +33,7 @@ public class StreamsEosTest {
         System.out.println("kafka=" + kafka);
         System.out.println("stateDir=" + stateDir);
         System.out.println("command=" + command);
+        System.out.flush();
 
         if (command == null || stateDir == null) {
             System.exit(-1);
@@ -56,6 +57,8 @@ public class StreamsEosTest {
                 break;
             default:
                 System.out.println("unknown command: " + command);
+                System.out.flush();
+                System.exit(-1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a0d9c57..3719feb 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -135,7 +135,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
         self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
+            monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")

http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 60dc4b1..0863e25 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -19,7 +19,6 @@ from ducktape.mark import ignore
 from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
     StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
-import time
 
 
 class StreamsEosTest(KafkaTest):
@@ -29,106 +28,138 @@ class StreamsEosTest(KafkaTest):
 
     def __init__(self, test_context):
         super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
-            'data' : { 'partitions': 5, 'replication-factor': 2 },
-            'echo' : { 'partitions': 5, 'replication-factor': 2 },
-            'min' : { 'partitions': 5, 'replication-factor': 2 },
-            'sum' : { 'partitions': 5, 'replication-factor': 2 },
-            'repartition' : { 'partitions': 5, 'replication-factor': 2 },
-            'max' : { 'partitions': 5, 'replication-factor': 2 },
-            'cnt' : { 'partitions': 5, 'replication-factor': 2 }
+            'data': {'partitions': 5, 'replication-factor': 2},
+            'echo': {'partitions': 5, 'replication-factor': 2},
+            'min': {'partitions': 5, 'replication-factor': 2},
+            'sum': {'partitions': 5, 'replication-factor': 2},
+            'repartition': {'partitions': 5, 'replication-factor': 2},
+            'max': {'partitions': 5, 'replication-factor': 2},
+            'cnt': {'partitions': 5, 'replication-factor': 2}
         })
         self.driver = StreamsEosTestDriverService(test_context, self.kafka)
         self.test_context = test_context
 
-    @ignore
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_rebalance_simple(self):
         self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                            StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                            StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    @ignore
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_rebalance_complex(self):
         self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
                            StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
                            StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    def run_rebalance(self, processor1, processor2, verifier):
+    def run_rebalance(self, processor1, processor2, processor3, verifier):
         """
         Starts and stops two test clients a few times.
         Ensure that all records are delivered exactly-once.
         """
 
         self.driver.start()
-        processor1.start()
-
-        time.sleep(120)
-
-        processor2.start()
 
-        time.sleep(120)
-        processor1.stop()
-
-        time.sleep(120)
-        processor1.start()
-
-        time.sleep(120)
-        processor2.stop()
-
-        time.sleep(120)
+        self.add_streams(processor1)
+        self.add_streams2(processor1, processor2)
+        self.add_streams3(processor1, processor2, processor3)
+        self.stop_streams3(processor2, processor3, processor1)
+        self.add_streams3(processor2, processor3, processor1)
+        self.stop_streams3(processor1, processor3, processor2)
+        self.stop_streams2(processor1, processor3)
+        self.stop_streams(processor1)
 
         self.driver.stop()
 
-        processor1.stop()
-        processor2.stop()
-
         verifier.start()
         verifier.wait()
 
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
-    @ignore
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_failure_and_recovery(self):
         self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                                       StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsEosTestJobRunnerService(self.test_context, self.kafka),
                                       StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    @ignore
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_failure_and_recovery_complex(self):
         self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
                                       StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
                                       StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
 
-    def run_failure_and_recovery(self, processor1, processor2, verifier):
+    def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
         """
         Starts two test clients, then abort (kill -9) and restart them a few times.
         Ensure that all records are delivered exactly-once.
         """
 
         self.driver.start()
-        processor1.start()
-        processor2.start()
-
-        time.sleep(120)
-        processor1.abortThenRestart()
-
-        time.sleep(120)
-        processor1.abortThenRestart()
 
-        time.sleep(120)
-        processor2.abortThenRestart()
-
-        time.sleep(120)
+        self.add_streams(processor1)
+        self.add_streams2(processor1, processor2)
+        self.add_streams3(processor1, processor2, processor3)
+        self.abort_streams(processor2, processor3, processor1)
+        self.add_streams3(processor2, processor3, processor1)
+        self.abort_streams(processor2, processor3, processor1)
+        self.add_streams3(processor2, processor3, processor1)
+        self.abort_streams(processor1, processor3, processor2)
+        self.stop_streams2(processor1, processor3)
+        self.stop_streams(processor1)
 
         self.driver.stop()
 
-        processor1.stop()
-        processor2.stop()
-
         verifier.start()
         verifier.wait()
 
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
+
+    def add_streams(self, processor):
+        processor.start()
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            self.wait_for_startup(monitor, processor)
+
+    def add_streams2(self, running_processor, processor_to_be_started):
+        with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
+            self.add_streams(processor_to_be_started)
+            self.wait_for_startup(monitor, running_processor)
+
+    def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
+        with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
+            self.add_streams2(running_processor2, processor_to_be_started)
+            self.wait_for_startup(monitor, running_processor1)
+
+    def stop_streams(self, processor_to_be_stopped):
+        with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
+            processor_to_be_stopped.stop()
+            self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
+
+    def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
+        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
+            self.stop_streams(processor_to_be_stopped)
+            self.wait_for_startup(monitor, keep_alive_processor)
+
+    def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
+            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+            self.wait_for_startup(monitor, keep_alive_processor1)
+
+    def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
+            with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
+                processor_to_be_aborted.stop_nodes(False)
+            self.wait_for_startup(monitor2, keep_alive_processor2)
+        self.wait_for_startup(monitor1, keep_alive_processor1)
+
+    def wait_for_startup(self, monitor, processor):
+        self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING")
+        self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
+        self.wait_for(monitor, processor, "processed 500 records from topic=data")
+
+    def wait_for(self, monitor, processor, output):
+        monitor.wait_until(output,
+                           timeout_sec=300,
+                           err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))