You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/25 16:34:31 UTC

kafka git commit: KAFKA-5362: Add Streams EOS system test with repartitioning topic

Repository: kafka
Updated Branches:
  refs/heads/trunk a5c47db13 -> 226583480


KAFKA-5362: Add Streams EOS system test with repartitioning topic

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3310 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22658348
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22658348
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22658348

Branch: refs/heads/trunk
Commit: 2265834803649d2f9fb090e86d11309f6e9789cb
Parents: a5c47db
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun Jun 25 09:34:27 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Jun 25 09:34:27 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java |   2 +-
 .../internals/AbstractProcessorContext.java     |   8 +-
 .../kafka/streams/tests/EosTestClient.java      |  43 ++-
 .../kafka/streams/tests/EosTestDriver.java      | 375 ++++++++++++-------
 .../kafka/streams/tests/SmokeTestUtil.java      |  63 ++--
 .../kafka/streams/tests/StreamsEosTest.java     |  13 +-
 tests/kafkatest/services/streams.py             |  10 +
 .../kafkatest/tests/streams/streams_eos_test.py |  94 +++--
 8 files changed, 391 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 02bebbc..e7a8c44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -264,7 +264,7 @@ public class StreamsConfig extends AbstractConfig {
      */
     @Deprecated
     public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management. This config is deprecated and will be ignored as Streams API does not use Zookeeper anymore.";
 
     static {
         CONFIG = new ConfigDef()

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index b306210..04af9f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -56,8 +56,8 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         this.config = config;
         this.metrics = metrics;
         this.stateManager = stateManager;
-        valueSerde = config.valueSerde();
-        keySerde = config.keySerde();
+        valueSerde = config.defaultValueSerde();
+        keySerde = config.defaultKeySerde();
         this.cache = cache;
     }
 
@@ -160,7 +160,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public Map<String, Object> appConfigsWithPrefix(String prefix) {
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return config.originalsWithPrefix(prefix);
     }
 
@@ -171,7 +171,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
 
     @Override
     public RecordContext recordContext() {
-        return this.recordContext;
+        return recordContext;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index fe5dc64..7550109 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -34,13 +34,16 @@ public class EosTestClient extends SmokeTestUtil {
     static final String APP_ID = "EosTest";
     private final String kafka;
     private final File stateDir;
+    private final boolean withRepartitioning;
+
     private KafkaStreams streams;
     private boolean uncaughtException;
 
-    EosTestClient(final File stateDir, final String kafka) {
+    EosTestClient(final String kafka, final File stateDir, final boolean withRepartitioning) {
         super();
-        this.stateDir = stateDir;
         this.kafka = kafka;
+        this.stateDir = stateDir;
+        this.withRepartitioning = withRepartitioning;
     }
 
     private boolean isRunning = true;
@@ -81,8 +84,8 @@ public class EosTestClient extends SmokeTestUtil {
         }
     }
 
-    private static KafkaStreams createKafkaStreams(final File stateDir,
-                                                   final String kafka) {
+    private KafkaStreams createKafkaStreams(final File stateDir,
+                                            final String kafka) {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -144,6 +147,38 @@ public class EosTestClient extends SmokeTestUtil {
             "sum")
             .to(stringSerde, longSerde, "sum");
 
+        if (withRepartitioning) {
+            final KStream<String, Integer> repartitionedData = data.through("repartition");
+
+            repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"));
+
+            final KGroupedStream<String, Integer> groupedDataAfterRepartitioning = repartitionedData.groupByKey();
+            // max
+            groupedDataAfterRepartitioning
+                .aggregate(
+                    new Initializer<Integer>() {
+                        @Override
+                        public Integer apply() {
+                            return Integer.MIN_VALUE;
+                        }
+                    },
+                    new Aggregator<String, Integer, Integer>() {
+                        @Override
+                        public Integer apply(final String aggKey,
+                                             final Integer value,
+                                             final Integer aggregate) {
+                            return (value > aggregate) ? value : aggregate;
+                        }
+                    },
+                    intSerde,
+                    "max")
+                .to(stringSerde, intSerde, "max");
+
+            // count
+            groupedDataAfterRepartitioning.count("cnt")
+                .to(stringSerde, longSerde, "cnt");
+        }
+
         return new KafkaStreams(builder, props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index a1b813b..af047a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -40,13 +40,13 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -68,7 +68,7 @@ public class EosTestDriver extends SmokeTestUtil {
         });
 
         final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -105,10 +105,10 @@ public class EosTestDriver extends SmokeTestUtil {
         System.out.println(numRecordsProduced + " records produced");
     }
 
-    public static void verify(final String kafka) {
+    public static void verify(final String kafka, final boolean withRepartitioning) {
         ensureStreamsApplicationDown(kafka);
 
-        final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);
+        final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka, withRepartitioning);
 
         final Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
@@ -117,27 +117,69 @@ public class EosTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
 
+        final String[] allInputTopics;
+        final String[] allOutputTopics;
+        if (withRepartitioning) {
+            allInputTopics = new String[] {"data", "repartition"};
+            allOutputTopics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
+        } else {
+            allInputTopics = new String[] {"data"};
+            allOutputTopics = new String[] {"echo", "min", "sum"};
+        }
+
+        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> inputRecordsPerTopicPerPartition;
         try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
-            final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
+            final List<TopicPartition> partitions = getAllPartitions(consumer, allInputTopics);
             consumer.assign(partitions);
             consumer.seekToBeginning(partitions);
 
-            final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
-                = getOutputRecords(consumer, committedOffsets);
+            inputRecordsPerTopicPerPartition = getRecords(consumer, committedOffsets, withRepartitioning, true);
+        } catch (final Exception e) {
+            e.printStackTrace(System.err);
+            System.out.println("FAILED");
+            return;
+        }
 
-            truncate("data", recordPerTopicPerPartition, committedOffsets);
+        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> outputRecordsPerTopicPerPartition;
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, allOutputTopics);
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
 
-            verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
-            verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));
+            outputRecordsPerTopicPerPartition = getRecords(consumer, consumer.endOffsets(partitions), withRepartitioning, false);
+        } catch (final Exception e) {
+            e.printStackTrace(System.err);
+            System.out.println("FAILED");
+            return;
+        }
+
+        verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo"));
+        if (withRepartitioning) {
+            verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition"));
+        }
+
+        verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min"));
+        verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum"));
 
-            verifyAllTransactionFinished(consumer, kafka);
+        if (withRepartitioning) {
+            verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max"));
+            verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt"));
+        }
 
-            // do not modify: required test output
-            System.out.println("ALL-RECORDS-DELIVERED");
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, allOutputTopics);
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+
+            verifyAllTransactionFinished(consumer, kafka, withRepartitioning);
         } catch (final Exception e) {
             e.printStackTrace(System.err);
             System.out.println("FAILED");
+            return;
         }
+
+        // do not modify: required test output
+        System.out.println("ALL-RECORDS-DELIVERED");
     }
 
     private static void ensureStreamsApplicationDown(final String kafka) {
@@ -159,7 +201,8 @@ public class EosTestDriver extends SmokeTestUtil {
         }
     }
 
-    private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
+    private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka,
+                                                                 final boolean withRepartitioning) {
         final Properties props = new Properties();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
@@ -171,6 +214,9 @@ public class EosTestDriver extends SmokeTestUtil {
         try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
             final Set<String> topics = new HashSet<>();
             topics.add("data");
+            if (withRepartitioning) {
+                topics.add("repartition");
+            }
             consumer.subscribe(topics);
             consumer.poll(0);
 
@@ -190,8 +236,10 @@ public class EosTestDriver extends SmokeTestUtil {
         return committedOffsets;
     }
 
-    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
-                                                                                                           final Map<TopicPartition, Long> committedOffsets) {
+    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getRecords(final KafkaConsumer<byte[], byte[]> consumer,
+                                                                                                     final Map<TopicPartition, Long> readEndOffsets,
+                                                                                                     final boolean withRepartitioning,
+                                                                                                     final boolean isInputTopic) {
         final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -201,16 +249,20 @@ public class EosTestDriver extends SmokeTestUtil {
 
             for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-                addRecord(record, recordPerTopicPerPartition);
+                final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+                final long readEndOffset = readEndOffsets.get(tp);
+                if (record.offset() < readEndOffset) {
+                    addRecord(record, recordPerTopicPerPartition, withRepartitioning);
+                } else if (!isInputTopic) {
+                    throw new RuntimeException("FAIL: did receive more records than expected for " + tp
+                        + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
+                }
+                if (consumer.position(tp) >= readEndOffset) {
+                    consumer.pause(Collections.singletonList(tp));
+                }
             }
 
-            if (receivedRecords.count() > 0) {
-                allRecordsReceived =
-                    receivedAllRecords(
-                        recordPerTopicPerPartition.get("data"),
-                        recordPerTopicPerPartition.get("echo"),
-                        committedOffsets);
-            }
+            allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size();
         }
 
         if (!allRecordsReceived) {
@@ -221,16 +273,13 @@ public class EosTestDriver extends SmokeTestUtil {
     }
 
     private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
-                                  final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition) {
+                                  final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
+                                  final boolean withRepartitioning) {
 
         final String topic = record.topic();
         final TopicPartition partition = new TopicPartition(topic, record.partition());
 
-        if ("data".equals(topic)
-            || "echo".equals(topic)
-            || "min".equals(topic)
-            || "sum".equals(topic)) {
-
+        if (verifyTopic(topic, withRepartitioning)) {
             Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
                 = recordPerTopicPerPartition.get(topic);
 
@@ -250,156 +299,218 @@ public class EosTestDriver extends SmokeTestUtil {
         }
     }
 
-    private static boolean receivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> supersetExpectedRecords,
-                                              final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords,
-                                              final Map<TopicPartition, Long> committedOffsets) {
-        if (supersetExpectedRecords == null
-            || receivedRecords == null
-            || supersetExpectedRecords.keySet().size() < committedOffsets.keySet().size()
-            || receivedRecords.keySet().size() < committedOffsets.keySet().size()) {
+    private static boolean verifyTopic(final String topic,
+                                       final boolean withRepartitioning) {
+        final boolean validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
 
-            return false;
+        if (withRepartitioning) {
+            return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic);
         }
 
+        return validTopic;
+    }
+
+    private static void verifyReceivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> expectedRecords,
+                                                 final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
+        if (expectedRecords.size() != receivedRecords.size()) {
+            throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size());
+        }
+
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
-            final TopicPartition tp = partitionRecords.getKey();
-            final int numberOfReceivedRecords = partitionRecords.getValue().size();
-            final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
-            if (committed != null) {
-                if (numberOfReceivedRecords < committed) {
-                    return false;
+            final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
+
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+                final ConsumerRecord<byte[], byte[]> expected = expectedRecord.next();
+
+                final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
+                final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+
+                if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
+                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
-            } else if (numberOfReceivedRecords > 0) {
-                throw new RuntimeException("Result verification failed for partition " + tp
-                    + ". No offset was committed but we received " + numberOfReceivedRecords + " records.");
             }
         }
+    }
 
+    private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
         final StringDeserializer stringDeserializer = new StringDeserializer();
         final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
-            try {
-                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
-                final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecords = supersetExpectedRecords.get(inputTopicPartition).iterator();
 
-                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
-                    final ConsumerRecord<byte[], byte[]> expected = expectedRecords.next();
+        final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+            final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+            final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+            final List<ConsumerRecord<byte[], byte[]>> partitionMin = partitionRecords.getValue();
 
-                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                    final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                    final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
-                    final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+            if (partitionInput.size() != partitionMin.size()) {
+                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+                    +  partitionRecords.getKey() + " but received " + partitionMin.size());
+            }
 
-                    if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
-                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
-                    }
+            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMin) {
+                final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+                final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                final String key = stringDeserializer.deserialize(input.topic(), input.key());
+                final int value = integerDeserializer.deserialize(input.topic(), input.value());
+
+                Integer min = currentMinPerKey.get(key);
+                if (min == null) {
+                    min = value;
+                } else {
+                    min = Math.min(min, value);
+                }
+                currentMinPerKey.put(key, min);
+
+                if (!receivedKey.equals(key) || receivedValue != min) {
+                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
-            } catch (final NullPointerException | NoSuchElementException e) {
-                return false;
             }
         }
-
-        return true;
     }
 
-    private static void truncate(final String topic,
-                                 final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
-                                 final Map<TopicPartition, Long> committedOffsets) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecords = recordPerTopicPerPartition.get(topic);
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> truncatedTopicRecords = recordPerTopicPerPartition.get(topic);
+    private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+        final LongDeserializer longDeserializer = new LongDeserializer();
 
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) {
-            final TopicPartition tp = partitionRecords.getKey();
-            final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
-            truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committed != null ? committed.intValue() : 0));
-        }
+        final HashMap<String, Long> currentSumPerKey = new HashMap<>();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+            final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+            final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+            final List<ConsumerRecord<byte[], byte[]>> partitionSum = partitionRecords.getValue();
+
+            if (partitionInput.size() != partitionSum.size()) {
+                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+                    +  partitionRecords.getKey() + " but received " + partitionSum.size());
+            }
+
+            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionSum) {
+                final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
 
-        recordPerTopicPerPartition.put(topic, truncatedTopicRecords);
+                final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                final String key = stringDeserializer.deserialize(input.topic(), input.key());
+                final int value = integerDeserializer.deserialize(input.topic(), input.value());
+
+                Long sum = currentSumPerKey.get(key);
+                if (sum == null) {
+                    sum = (long) value;
+                } else {
+                    sum += value;
+                }
+                currentSumPerKey.put(key, sum);
+
+                if (!receivedKey.equals(key) || receivedValue != sum) {
+                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
+                }
+            }
+        }
     }
 
-    private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+    private static void verifyMax(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
         final StringDeserializer stringDeserializer = new StringDeserializer();
         final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
 
         final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
-            try {
-                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
-                final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
+            final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
+            final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+            final List<ConsumerRecord<byte[], byte[]>> partitionMax = partitionRecords.getValue();
+
+            if (partitionInput.size() != partitionMax.size()) {
+                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+                    +  partitionRecords.getKey() + " but received " + partitionMax.size());
+            }
 
-                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
-                    final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
 
-                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                    final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                    final String key = stringDeserializer.deserialize(input.topic(), input.key());
-                    final int value = integerDeserializer.deserialize(input.topic(), input.value());
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMax) {
+                final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
 
+                final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                final String key = stringDeserializer.deserialize(input.topic(), input.key());
+                final int value = integerDeserializer.deserialize(input.topic(), input.value());
 
-                    Integer min = currentMinPerKey.get(key);
-                    if (min == null) {
-                        min = value;
-                    } else {
-                        min = Math.min(min, value);
-                    }
-                    currentMinPerKey.put(key, min);
 
-                    if (!receivedKey.equals(key) || receivedValue != min) {
-                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
-                    }
+                Integer max = currentMinPerKey.get(key);
+                if (max == null) {
+                    max = Integer.MIN_VALUE;
+                }
+                max = Math.max(max, value);
+                currentMinPerKey.put(key, max);
+
+                if (!receivedKey.equals(key) || receivedValue != max.intValue()) {
+                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
-            } catch (final NullPointerException e) {
-                System.err.println(inputPerTopicPerPartition);
-                e.printStackTrace(System.err);
-                throw e;
             }
         }
     }
 
-    private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+    private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
         final StringDeserializer stringDeserializer = new StringDeserializer();
         final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         final LongDeserializer longDeserializer = new LongDeserializer();
 
         final HashMap<String, Long> currentSumPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
-            try {
-                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
-                final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
-
-                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
-                    final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
-
-                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                    final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                    final String key = stringDeserializer.deserialize(input.topic(), input.key());
-                    final int value = integerDeserializer.deserialize(input.topic(), input.value());
-
-                    Long sum = currentSumPerKey.get(key);
-                    if (sum == null) {
-                        sum = (long) value;
-                    } else {
-                        sum += value;
-                    }
-                    currentSumPerKey.put(key, sum);
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
+            final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
+            final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+            final List<ConsumerRecord<byte[], byte[]>> partitionCnt = partitionRecords.getValue();
+
+            if (partitionInput.size() != partitionCnt.size()) {
+                throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+                    +  partitionRecords.getKey() + " but received " + partitionCnt.size());
+            }
 
-                    if (!receivedKey.equals(key) || receivedValue != sum) {
-                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
-                    }
+            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+            for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionCnt) {
+                final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+                final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                final String key = stringDeserializer.deserialize(input.topic(), input.key());
+
+                Long cnt = currentSumPerKey.get(key);
+                if (cnt == null) {
+                    cnt = 0L;
+                }
+                currentSumPerKey.put(key, ++cnt);
+
+                if (!receivedKey.equals(key) || receivedValue != cnt.longValue()) {
+                    throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
                 }
-            } catch (final NullPointerException e) {
-                System.err.println(inputPerTopicPerPartition);
-                e.printStackTrace(System.err);
-                throw e;
             }
         }
     }
 
     private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
-                                                     final String kafka) {
-        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "min", "sum");
+                                                     final String kafka,
+                                                     final boolean withRepartitioning) {
+        final String[] topics;
+        if (withRepartitioning) {
+            topics = new String[] {"echo", "min", "sum", "repartition", "max", "min"};
+        } else {
+            topics = new String[] {"echo", "min", "sum"};
+        }
+
+        final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);
         consumer.poll(0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 309c90d..150ec7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -32,31 +32,32 @@ import java.io.File;
 
 public class SmokeTestUtil {
 
-    public final static int WINDOW_SIZE = 100;
-    public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
-    public final static int END = Integer.MAX_VALUE;
+    final static int END = Integer.MAX_VALUE;
 
-    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
+    static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, false);
     }
 
-    public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
+    private static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
         return new ProcessorSupplier<Object, Object>() {
+            @Override
             public Processor<Object, Object> get() {
                 return new AbstractProcessor<Object, Object>() {
                     private int numRecordsProcessed = 0;
                     private ProcessorContext context;
 
                     @Override
-                    public void init(ProcessorContext context) {
+                    public void init(final ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
                         this.context = context;
                     }
 
                     @Override
-                    public void process(Object key, Object value) {
-                        if (printOffset) System.out.println(">>> " + context.offset());
+                    public void process(final Object key, final Object value) {
+                        if (printOffset) {
+                            System.out.println(">>> " + context.offset());
+                        }
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -64,29 +65,28 @@ public class SmokeTestUtil {
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(final long timestamp) { }
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() { }
                 };
             }
         };
     }
 
     public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
-        public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
-            return new KeyValue<K, V>(winKey.key(), value);
+        @Override
+        public KeyValue<K, V> apply(final Windowed<K> winKey, final V value) {
+            return new KeyValue<>(winKey.key(), value);
         }
     }
 
     public static class Agg {
 
-        public KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+        KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
             return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
                 @Override
-                public KeyValue<String, Long> apply(String key, Long value) {
+                public KeyValue<String, Long> apply(final String key, final Long value) {
                     return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
                 }
             };
@@ -101,19 +101,19 @@ public class SmokeTestUtil {
             };
         }
 
-        public Aggregator<String, Long, Long> adder() {
+        Aggregator<String, Long, Long> adder() {
             return new Aggregator<String, Long, Long>() {
                 @Override
-                public Long apply(String aggKey, Long value, Long aggregate) {
+                public Long apply(final String aggKey, final Long value, final Long aggregate) {
                     return aggregate + value;
                 }
             };
         }
 
-        public Aggregator<String, Long, Long> remover() {
+        Aggregator<String, Long, Long> remover() {
             return new Aggregator<String, Long, Long>() {
                 @Override
-                public Long apply(String aggKey, Long value, Long aggregate) {
+                public Long apply(final String aggKey, final Long value, final Long aggregate) {
                     return aggregate - value;
                 }
             };
@@ -124,33 +124,22 @@ public class SmokeTestUtil {
 
     public static Serde<Integer> intSerde = Serdes.Integer();
 
-    public static Serde<Long> longSerde = Serdes.Long();
+    static Serde<Long> longSerde = Serdes.Long();
 
-    public static Serde<Double> doubleSerde = Serdes.Double();
+    static Serde<Double> doubleSerde = Serdes.Double();
 
-    public static File createDir(String path) throws Exception {
-        File dir = new File(path);
+    static File createDir(final File parent, final String child) throws Exception {
+        final File dir = new File(parent, child);
 
         dir.mkdir();
 
         return dir;
     }
 
-    public static File createDir(File parent, String child) throws Exception {
-        File dir = new File(parent, child);
-
-        dir.mkdir();
-
-        return dir;
-    }
-
-    public static void sleep(long duration) {
+    public static void sleep(final long duration) {
         try {
             Thread.sleep(duration);
-        } catch (Exception ex) {
-            //
-        }
+        } catch (final Exception ignore) { }
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index 58499aa..d508286 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -30,9 +30,9 @@ public class StreamsEosTest {
         final String command = args.length > 2 ? args[2] : null;
 
         System.out.println("StreamsTest instance started");
-        System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("command=" + command);
 
         if (command == null || stateDir == null) {
             System.exit(-1);
@@ -43,11 +43,16 @@ public class StreamsEosTest {
                 EosTestDriver.generate(kafka);
                 break;
             case "process":
-                final EosTestClient client = new EosTestClient(new File(stateDir), kafka);
-                client.start();
+                new EosTestClient(kafka, new File(stateDir), false).start();
+                break;
+            case "process-complex":
+                new EosTestClient(kafka, new File(stateDir), true).start();
                 break;
             case "verify":
-                EosTestDriver.verify(kafka);
+                EosTestDriver.verify(kafka, false);
+                break;
+            case "verify-complex":
+                EosTestDriver.verify(kafka, true);
                 break;
             default:
                 System.out.println("unknown command: " + command);

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index f6b6dd4..a0d9c57 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -181,11 +181,21 @@ class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
         super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
 
 
+class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
+
+
 class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
 
 
+class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
+
+
 class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 305cde0..be2c4f8 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -16,7 +16,8 @@
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, StreamsEosTestVerifyRunnerService
+from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
+    StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
 import time
 
 
@@ -30,76 +31,99 @@ class StreamsEosTest(KafkaTest):
             'data' : { 'partitions': 5, 'replication-factor': 2 },
             'echo' : { 'partitions': 5, 'replication-factor': 2 },
             'min' : { 'partitions': 5, 'replication-factor': 2 },
-            'sum' : { 'partitions': 5, 'replication-factor': 2 }
+            'sum' : { 'partitions': 5, 'replication-factor': 2 },
+            'repartition' : { 'partitions': 5, 'replication-factor': 2 },
+            'max' : { 'partitions': 5, 'replication-factor': 2 },
+            'cnt' : { 'partitions': 5, 'replication-factor': 2 }
         })
         self.driver = StreamsEosTestDriverService(test_context, self.kafka)
-        self.processor1 = StreamsEosTestJobRunnerService(test_context, self.kafka)
-        self.processor2 = StreamsEosTestJobRunnerService(test_context, self.kafka)
-        self.verifier = StreamsEosTestVerifyRunnerService(test_context, self.kafka)
+        self.test_context = test_context
 
     @cluster(num_nodes=8)
-    def test_rebalance(self):
+    def test_rebalance_simple(self):
+        self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+    @cluster(num_nodes=8)
+    def test_rebalance_complex(self):
+        self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                           StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+    def run_rebalance(self, processor1, processor2, verifier):
         """
         Starts and stops two test clients a few times.
         Ensure that all records are delivered exactly-once.
         """
 
         self.driver.start()
-        self.processor1.start()
+        processor1.start()
 
-        time.sleep(30)
+        time.sleep(120)
 
-        self.processor2.start()
+        processor2.start()
 
-        time.sleep(30)
-        self.processor1.stop()
+        time.sleep(120)
+        processor1.stop()
 
-        time.sleep(30)
-        self.processor1.start()
+        time.sleep(120)
+        processor1.start()
 
-        time.sleep(30)
-        self.processor2.stop()
+        time.sleep(120)
+        processor2.stop()
 
-        time.sleep(30)
+        time.sleep(120)
 
         self.driver.stop()
 
-        self.processor1.stop()
-        self.processor2.stop()
+        processor1.stop()
+        processor2.stop()
 
-        self.verifier.start()
-        self.verifier.wait()
+        verifier.start()
+        verifier.wait()
 
-        self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+        verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     @cluster(num_nodes=8)
     def test_failure_and_recovery(self):
+        self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+    @cluster(num_nodes=8)
+    def test_failure_and_recovery_complex(self):
+        self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+                                      StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+    def run_failure_and_recovery(self, processor1, processor2, verifier):
         """
         Starts two test clients, then abort (kill -9) and restart them a few times.
         Ensure that all records are delivered exactly-once.
         """
 
         self.driver.start()
-        self.processor1.start()
-        self.processor2.start()
+        processor1.start()
+        processor2.start()
 
-        time.sleep(30)
-        self.processor1.abortThenRestart()
+        time.sleep(120)
+        processor1.abortThenRestart()
 
-        time.sleep(30)
-        self.processor1.abortThenRestart()
+        time.sleep(120)
+        processor1.abortThenRestart()
 
-        time.sleep(30)
-        self.processor2.abortThenRestart()
+        time.sleep(120)
+        processor2.abortThenRestart()
 
-        time.sleep(30)
+        time.sleep(120)
 
         self.driver.stop()
 
-        self.processor1.stop()
-        self.processor2.stop()
+        processor1.stop()
+        processor2.stop()
 
-        self.verifier.start()
-        self.verifier.wait()
+        verifier.start()
+        verifier.wait()
 
-        self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+        verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)