You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/07 20:52:34 UTC

[kafka] branch trunk updated: KAFKA-8331: stream static membership system test (#6877)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cca05ca  KAFKA-8331: stream static membership system test (#6877)
cca05ca is described below

commit cca05cace4105c829f303c13eed8ace2efd7fa0c
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Fri Jun 7 13:52:12 2019 -0700

    KAFKA-8331: stream static membership system test (#6877)
    
    As title suggested, we boost 3 stream instances stream job with one minute session timeout, and once the group is stable, doing couple of rolling bounces for the entire cluster. Every rejoin based on restart should have no generation bump on the client side.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>,  Bill Bejeck <bb...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |  5 ++
 .../consumer/internals/ConsumerCoordinator.java    |  3 +-
 .../runtime/distributed/WorkerCoordinator.java     |  1 +
 .../streams/tests/StaticMemberTestClient.java      | 84 ++++++++++++++++++++++
 .../streams/tests/StreamsNamedRepartitionTest.java |  2 -
 .../{streams_property.py => consumer_property.py}  | 11 +--
 tests/kafkatest/services/streams.py                | 22 ++++++
 tests/kafkatest/services/streams_property.py       |  4 --
 .../streams_named_repartition_topic_test.py        | 35 ++-------
 .../tests/streams/streams_optimized_test.py        | 21 ++----
 ...c_test.py => streams_static_membership_test.py} | 83 ++++++++++-----------
 .../tests/streams/streams_upgrade_test.py          | 24 +++----
 .../streams/utils/__init__.py}                     | 12 +---
 tests/kafkatest/tests/streams/utils/util.py        | 36 ++++++++++
 14 files changed, 211 insertions(+), 132 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 54678f7..73563fd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -802,6 +802,11 @@ public abstract class AbstractCoordinator implements Closeable {
         return generation;
     }
 
+    protected synchronized String memberId() {
+        return generation == null ? JoinGroupRequest.UNKNOWN_MEMBER_ID :
+                generation.memberId;
+    }
+
     /**
      * Check whether given generation id is matching the record within current generation.
      * Only using in unit tests.
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5d39da5..64bf17d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -585,7 +585,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     // visible for testing
     void invokeCompletedOffsetCommitCallbacks() {
         if (asyncCommitFenced.get()) {
-            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId.orElse("unset_instance_id"));
+            throw new FencedInstanceIdException("Get fenced exception for group.instance.id: " +
+                    groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId());
         }
         while (true) {
             OffsetCommitCompletion completion = completedOffsetCommits.poll();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index fd7c7a4..230a272 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -227,6 +227,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
         return super.rejoinNeededOrPending() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested;
     }
 
+    @Override
     public String memberId() {
         Generation generation = generation();
         if (generation != null)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java
new file mode 100644
index 0000000..96ccad4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
+import java.util.Objects;
+import java.util.Properties;
+
+public class StaticMemberTestClient {
+
+    private static String testName = "StaticMemberTestClient";
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println(testName + " requires one argument (properties-file) but none provided: ");
+        }
+
+        System.out.println("StreamsTest instance started");
+
+        final String propFileName = args[0];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final String groupInstanceId = Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+
+        System.out.println(testName + " instance started with group.instance.id " + groupInstanceId);
+        System.out.println("props=" + streamsProperties);
+        System.out.flush();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic")));
+
+        final KStream dataStream = builder.stream(inputTopic);
+        dataStream.peek((k, v) ->  System.out.println(String.format("PROCESSED key=%s value=%s", k, v)));
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.setStateListener((newState, oldState) -> {
+            if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
+                System.out.println("REBALANCING -> RUNNING");
+                System.out.flush();
+            }
+        });
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("Static membership test closed");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index 911716f..c408d9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -117,7 +117,5 @@ public class StreamsNamedRepartitionTest {
             System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
             System.out.flush();
         }));
-
     }
-
 }
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/consumer_property.py
similarity index 84%
copy from tests/kafkatest/services/streams_property.py
copy to tests/kafkatest/services/consumer_property.py
index 054ea64..0a9756a 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/consumer_property.py
@@ -14,13 +14,8 @@
 # limitations under the License.
 
 """
-Define Streams configuration property names here.
+Define Consumer configuration property names here.
 """
 
-STATE_DIR = "state.dir"
-KAFKA_SERVERS = "bootstrap.servers"
-NUM_THREADS = "num.stream.threads"
-
-
-
-
+GROUP_INSTANCE_ID = "group.instance.id"
+SESSION_TIMEOUT_MS = "session.timeout.ms"
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 5e2c2e9..70564b9 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -16,6 +16,7 @@
 import os.path
 import signal
 import streams_property
+import consumer_property
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -534,3 +535,24 @@ class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
 
         cfg = KafkaConfig(**properties)
         return cfg.render()
+
+class StaticMemberTestService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, group_instance_id, num_threads):
+        super(StaticMemberTestService, self).__init__(test_context,
+                                                      kafka,
+                                                      "org.apache.kafka.streams.tests.StaticMemberTestClient",
+                                                      "")
+        self.INPUT_TOPIC = None
+        self.GROUP_INSTANCE_ID = group_instance_id
+        self.NUM_THREADS = num_threads
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID,
+                      consumer_property.SESSION_TIMEOUT_MS: 60000}
+
+        properties['input.topic'] = self.INPUT_TOPIC
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py
index 054ea64..99f0ece 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/streams_property.py
@@ -20,7 +20,3 @@ Define Streams configuration property names here.
 STATE_DIR = "state.dir"
 KAFKA_SERVERS = "bootstrap.servers"
 NUM_THREADS = "num.stream.threads"
-
-
-
-
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index b9894ee..1fcdd5f 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -13,14 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsNamedRepartitionTopicService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
 
 class StreamsNamedRepartitionTopicTest(Test):
     """
@@ -32,6 +30,7 @@ class StreamsNamedRepartitionTopicTest(Test):
     input_topic = 'inputTopic'
     aggregation_topic = 'aggregationTopic'
     pattern = 'AGGREGATED'
+    stopped_message = 'NAMED_REPARTITION_TEST Streams Stopped'
 
     def __init__(self, test_context):
         super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
@@ -66,43 +65,25 @@ class StreamsNamedRepartitionTopicTest(Test):
         for processor in processors:
             processor.CLEAN_NODE_ENABLED = False
             self.set_topics(processor)
-            self.verify_running(processor, 'REBALANCING -> RUNNING')
+            verify_running(processor, 'REBALANCING -> RUNNING')
 
         self.verify_processing(processors)
 
         # do rolling upgrade
         for processor in processors:
-            self.verify_stopped(processor)
+            verify_stopped(processor, self.stopped_message)
             #  will tell app to add operations before repartition topic
             processor.ADD_ADDITIONAL_OPS = 'true'
-            self.verify_running(processor, 'UPDATED Topology')
+            verify_running(processor, 'UPDATED Topology')
 
         self.verify_processing(processors)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
         self.zookeeper.stop()
 
-    @staticmethod
-    def verify_running(processor, message):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            monitor.wait_until(message,
-                               timeout_sec=60,
-                               err_msg="Never saw '%s' message " % message + str(processor.node.account))
-
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))
-
     def verify_processing(self, processors):
         for processor in processors:
             with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
@@ -110,10 +91,6 @@ class StreamsNamedRepartitionTopicTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
         processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py
index 31efc0d..ecd84c2 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -15,12 +15,11 @@
 
 import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-
+from kafkatest.tests.streams.utils import stop_processors
 
 class StreamsOptimizedTest(Test):
     """
@@ -33,6 +32,7 @@ class StreamsOptimizedTest(Test):
     reduce_topic = 'reduceTopic'
     join_topic = 'joinTopic'
     operation_pattern = 'AGGREGATED\|REDUCED\|JOINED'
+    stopped_message = 'OPTIMIZE_TEST Streams Stopped'
 
     def __init__(self, test_context):
         super(StreamsOptimizedTest, self).__init__(test_context)
@@ -75,7 +75,7 @@ class StreamsOptimizedTest(Test):
 
         self.verify_processing(processors, verify_individual_operations=False)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         # start again with topology optimized
         for processor in processors:
@@ -84,7 +84,7 @@ class StreamsOptimizedTest(Test):
 
         self.verify_processing(processors, verify_individual_operations=True)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
@@ -100,15 +100,6 @@ class StreamsOptimizedTest(Test):
                                err_msg="Never saw 'REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' message "
                                        % repartition_topic_count + str(processor.node.account))
 
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('OPTIMIZE_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'OPTIMIZE_TEST Streams Stopped' message" + str(processor.node.account))
-
     def verify_processing(self, processors, verify_individual_operations):
         for processor in processors:
             if not self.all_source_subtopology_tasks(processor):
@@ -139,10 +130,6 @@ class StreamsOptimizedTest(Test):
 
         return False
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
         processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py
similarity index 53%
copy from tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
copy to tests/kafkatest/tests/streams/streams_static_membership_test.py
index b9894ee..a466ea8 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -13,31 +13,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.streams import StreamsNamedRepartitionTopicService
+from kafkatest.services.streams import StaticMemberTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs
 
-
-class StreamsNamedRepartitionTopicTest(Test):
+class StreamsStaticMembershipTest(Test):
     """
-    Tests using a named repartition topic by starting
-    application then doing a rolling upgrade with added
-    operations and the application still runs
+    Tests using static membership when broker points to minimum supported
+    version (2.3) or higher.
     """
 
     input_topic = 'inputTopic'
-    aggregation_topic = 'aggregationTopic'
-    pattern = 'AGGREGATED'
+    pattern = 'PROCESSED'
+    running_message = 'REBALANCING -> RUNNING'
+    stopped_message = 'Static membership test closed'
 
     def __init__(self, test_context):
-        super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
+        super(StreamsStaticMembershipTest, self).__init__(test_context)
         self.topics = {
-            self.input_topic: {'partitions': 6},
-            self.aggregation_topic: {'partitions': 6}
+            self.input_topic: {'partitions': 18},
         }
 
         self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
@@ -51,13 +48,14 @@ class StreamsNamedRepartitionTopicTest(Test):
                                            throughput=1000,
                                            acks=1)
 
-    def test_upgrade_topology_with_named_repartition_topic(self):
+    def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self):
         self.zookeeper.start()
         self.kafka.start()
 
-        processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
-        processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
-        processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
+        numThreads = 3
+        processor1 = StaticMemberTestService(self.test_context, self.kafka, "consumer-A", numThreads)
+        processor2 = StaticMemberTestService(self.test_context, self.kafka, "consumer-B", numThreads)
+        processor3 = StaticMemberTestService(self.test_context, self.kafka, "consumer-C", numThreads)
 
         processors = [processor1, processor2, processor3]
 
@@ -66,43 +64,39 @@ class StreamsNamedRepartitionTopicTest(Test):
         for processor in processors:
             processor.CLEAN_NODE_ENABLED = False
             self.set_topics(processor)
-            self.verify_running(processor, 'REBALANCING -> RUNNING')
+            verify_running(processor, self.running_message)
 
         self.verify_processing(processors)
 
-        # do rolling upgrade
+        # do several rolling bounces
+        num_bounces = 3
+        for i in range(0, num_bounces):
+            for processor in processors:
+                verify_stopped(processor, self.stopped_message)
+                verify_running(processor, self.running_message)
+
+        stable_generation = -1
         for processor in processors:
-            self.verify_stopped(processor)
-            #  will tell app to add operations before repartition topic
-            processor.ADD_ADDITIONAL_OPS = 'true'
-            self.verify_running(processor, 'UPDATED Topology')
+            generations = extract_generation_from_logs(processor)
+            num_bounce_generations = num_bounces * numThreads
+            assert num_bounce_generations <= len(generations), \
+                "Smaller than minimum expected %d generation messages, actual %d" % (num_bounce_generations, len(generations))
+
+            for generation in generations[-num_bounce_generations:]:
+                generation = int(generation)
+                if stable_generation == -1:
+                    stable_generation = generation
+                assert stable_generation == generation, \
+                    "Stream rolling bounce have caused unexpected generation bump %d" % generation
 
         self.verify_processing(processors)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
         self.zookeeper.stop()
 
-    @staticmethod
-    def verify_running(processor, message):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            monitor.wait_until(message,
-                               timeout_sec=60,
-                               err_msg="Never saw '%s' message " % message + str(processor.node.account))
-
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account))
-
     def verify_processing(self, processors):
         for processor in processors:
             with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
@@ -110,10 +104,5 @@ class StreamsNamedRepartitionTopicTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account))
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
-        processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 37ab770..6a6a8bc 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,6 +23,7 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
     StreamsUpgradeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.utils import extract_generation_from_logs
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
@@ -45,7 +46,7 @@ anyone can verify that by calling
 curl https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz to download the jar
 and if it is not uploaded yet, ping the dev@kafka mailing list to request it being uploaded.
 
-This test needs to get updated, but this requires several steps
+This test needs to get updated, but this requires several steps,
 which are outlined here:
 
 1. Update all relevant versions in tests/kafkatest/version.py this will include adding a new version for the new
@@ -57,17 +58,17 @@ which are outlined here:
    during the system test run.
    
 3. Update the vagrant/bash.sh file to include all new versions, including the newly released version
-   and all point releases for existing releases.  You only need to list the latest version in 
+   and all point releases for existing releases. You only need to list the latest version in 
    this file.
    
 4. Then update all relevant versions in the tests/docker/Dockerfile
 
-5. Add a new "upgrade-system-tests-XXXX module under streams.  You can probably just copy the 
-   latest system test module from the last release.  Just make sure to update the systout print
-   statement in StreamsUpgradeTest to the version for the release.  After you add the new module
+5. Add a new upgrade-system-tests-XXXX module under streams. You can probably just copy the 
+   latest system test module from the last release. Just make sure to update the systout print
+   statement in StreamsUpgradeTest to the version for the release. After you add the new module
    you'll need to update settings.gradle file to include the name of the module you just created
-   for gradle to recognize the newly added module
-   
+   for gradle to recognize the newly added module.
+
 6. Then you'll need to update any version changes in gradle/dependencies.gradle
 
 """
@@ -598,9 +599,9 @@ class StreamsUpgradeTest(Test):
                     retries = 0
 
                     while retries < 10:
-                        processor_found = self.extract_generation_from_logs(processor)
-                        first_other_processor_found = self.extract_generation_from_logs(first_other_processor)
-                        second_other_processor_found = self.extract_generation_from_logs(second_other_processor)
+                        processor_found = extract_generation_from_logs(processor)
+                        first_other_processor_found = extract_generation_from_logs(first_other_processor)
+                        second_other_processor_found = extract_generation_from_logs(second_other_processor)
 
                         if len(processor_found) > 0 and len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0:
                             self.logger.info("processor: " + str(processor_found))
@@ -632,9 +633,6 @@ class StreamsUpgradeTest(Test):
 
         return current_generation
 
-    def extract_generation_from_logs(self, processor):
-        return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))
-
     def extract_highest_generation(self, found_generations):
         return int(found_generations[-1])
 
diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/tests/streams/utils/__init__.py
similarity index 83%
copy from tests/kafkatest/services/streams_property.py
copy to tests/kafkatest/tests/streams/utils/__init__.py
index 054ea64..6d2957f 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/tests/streams/utils/__init__.py
@@ -13,14 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-Define Streams configuration property names here.
-"""
-
-STATE_DIR = "state.dir"
-KAFKA_SERVERS = "bootstrap.servers"
-NUM_THREADS = "num.stream.threads"
-
-
-
-
+from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs
diff --git a/tests/kafkatest/tests/streams/utils/util.py b/tests/kafkatest/tests/streams/utils/util.py
new file mode 100644
index 0000000..683b199
--- /dev/null
+++ b/tests/kafkatest/tests/streams/utils/util.py
@@ -0,0 +1,36 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def verify_running(processor, message):
+    node = processor.node
+    with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+        processor.start()
+        monitor.wait_until(message,
+                           timeout_sec=60,
+                           err_msg="Never saw '%s' message " % message + str(processor.node.account))
+
+def verify_stopped(processor, message):
+    node = processor.node
+    with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+        processor.stop()
+        monitor.wait_until(message,
+                           timeout_sec=60,
+                           err_msg="'%s' message " % message + str(processor.node.account))
+
+def stop_processors(processors, stopped_message):
+    for processor in processors:
+        verify_stopped(processor, stopped_message)
+
+def extract_generation_from_logs(processor):
+    return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))