You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 21:08:57 UTC

kafka git commit: KAFKA-5362: Add EOS system tests for Streams API

Repository: kafka
Updated Branches:
  refs/heads/trunk 21194a63e -> ba07d828c


KAFKA-5362: Add EOS system tests for Streams API

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3201 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba07d828
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba07d828
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba07d828

Branch: refs/heads/trunk
Commit: ba07d828c545d542cc11db60249e660d88a20fbb
Parents: 21194a6
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 8 14:08:54 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 8 14:08:54 2017 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   4 +
 .../processor/internals/StreamThread.java       |   4 +-
 .../kafka/streams/tests/EosTestClient.java      | 150 ++++++
 .../kafka/streams/tests/EosTestDriver.java      | 470 +++++++++++++++++++
 .../kafka/streams/tests/StreamsEosTest.java     |  57 +++
 tests/kafkatest/services/streams.py             |  25 +
 .../kafkatest/tests/streams/streams_eos_test.py | 105 +++++
 7 files changed, 813 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 26f4a77..19f3a4d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -197,6 +197,10 @@
       <allow pkg="org.I0Itec.zkclient" />
     </subpackage>
 
+    <subpackage name="test">
+      <allow pkg="kafka.admin" />
+    </subpackage>
+
     <subpackage name="state">
       <allow pkg="org.rocksdb" />
     </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 624a15e..23ce958 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -548,7 +548,7 @@ public class StreamThread extends Thread {
             timerStartedMs = time.milliseconds();
 
             // try to fetch some records if necessary
-            final ConsumerRecords<byte[], byte[]> records = pollRequests(pollTimeMs);
+            final ConsumerRecords<byte[], byte[]> records = pollRequests();
             if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
                 streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
                 addRecordsToTasks(records);
@@ -573,7 +573,7 @@ public class StreamThread extends Thread {
      * Get the next batch of records by polling.
      * @return Next batch of records or null if no records available.
      */
-    private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+    private ConsumerRecords<byte[], byte[]> pollRequests() {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
new file mode 100644
index 0000000..fe5dc64
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class EosTestClient extends SmokeTestUtil {
+
+    static final String APP_ID = "EosTest";
+    private final String kafka;
+    private final File stateDir;
+    private KafkaStreams streams;
+    private boolean uncaughtException;
+
+    EosTestClient(final File stateDir, final String kafka) {
+        super();
+        this.stateDir = stateDir;
+        this.kafka = kafka;
+    }
+
+    private boolean isRunning = true;
+
+    public void start() {
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                isRunning = false;
+                streams.close(5, TimeUnit.SECONDS);
+                // do not remove these printouts since they are needed for health scripts
+                if (!uncaughtException) {
+                    System.out.println("EOS-TEST-CLIENT-CLOSED");
+                }
+            }
+        }));
+
+        while (isRunning) {
+            if (streams == null) {
+                uncaughtException = false;
+
+                streams = createKafkaStreams(stateDir, kafka);
+                streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                    @Override
+                    public void uncaughtException(final Thread t, final Throwable e) {
+                        System.out.println("EOS-TEST-CLIENT-EXCEPTION");
+                        e.printStackTrace();
+                        uncaughtException = true;
+                    }
+                });
+                streams.start();
+            }
+            if (uncaughtException) {
+                streams.close(5, TimeUnit.SECONDS);
+                streams = null;
+            }
+            sleep(1000);
+        }
+    }
+
+    private static KafkaStreams createKafkaStreams(final File stateDir,
+                                                   final String kafka) {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, Integer> data = builder.stream("data");
+
+        data.to("echo");
+        data.process(SmokeTestUtil.printProcessorSupplier("data"));
+
+        final KGroupedStream<String, Integer> groupedData = data.groupByKey();
+        // min
+        groupedData
+            .aggregate(
+                new Initializer<Integer>() {
+                    @Override
+                    public Integer apply() {
+                        return Integer.MAX_VALUE;
+                    }
+                },
+                new Aggregator<String, Integer, Integer>() {
+                    @Override
+                    public Integer apply(final String aggKey,
+                                         final Integer value,
+                                         final Integer aggregate) {
+                        return (value < aggregate) ? value : aggregate;
+                    }
+                },
+                intSerde,
+                "min")
+            .to(stringSerde, intSerde, "min");
+
+        // sum
+        groupedData.aggregate(
+            new Initializer<Long>() {
+                @Override
+                public Long apply() {
+                    return 0L;
+                }
+            },
+            new Aggregator<String, Integer, Long>() {
+                @Override
+                public Long apply(final String aggKey,
+                                  final Integer value,
+                                  final Long aggregate) {
+                    return (long) value + aggregate;
+                }
+            },
+            longSerde,
+            "sum")
+            .to(stringSerde, longSerde, "sum");
+
+        return new KafkaStreams(builder, props);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
new file mode 100644
index 0000000..07a0b1a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import kafka.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class EosTestDriver extends SmokeTestUtil {
+
+    private static final int MAX_NUMBER_OF_KEYS = 100;
+    private static final long MAX_IDLE_TIME_MS = 300000L;
+
+    private static boolean isRunning = true;
+
+    static void generate(final String kafka) throws Exception {
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                isRunning = false;
+            }
+        });
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+        final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps);
+
+        final Random rand = new Random(System.currentTimeMillis());
+
+        int numRecordsProduced = 0;
+        while (isRunning) {
+            final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
+            final int value = rand.nextInt(10000);
+
+            final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
+
+            producer.send(record, new Callback() {
+                @Override
+                public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+                    if (exception != null) {
+                        exception.printStackTrace();
+                        Exit.exit(1);
+                    }
+                }
+            });
+
+            numRecordsProduced++;
+            if (numRecordsProduced % 1000 == 0) {
+                System.out.println(numRecordsProduced + " records produced");
+            }
+            Utils.sleep(rand.nextInt(50));
+        }
+        producer.close();
+        System.out.println(numRecordsProduced + " records produced");
+    }
+
+    public static void verify(final String kafka) {
+        ensureStreamsApplicationDown(kafka);
+
+        final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);
+
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+            final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+
+            final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
+                = getOutputRecords(consumer, committedOffsets);
+
+            truncate("data", recordPerTopicPerPartition, committedOffsets);
+
+            verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
+            verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));
+
+            verifyAllTransactionFinished(consumer, kafka);
+
+            // do not modify: required test output
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } catch (final Exception e) {
+            e.printStackTrace(System.err);
+            System.out.println("FAILED");
+        }
+    }
+
+    private static void ensureStreamsApplicationDown(final String kafka) {
+        AdminClient adminClient = null;
+        try {
+            adminClient = AdminClient.createSimplePlaintext(kafka);
+
+            final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+            while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
+                if (System.currentTimeMillis() > maxWaitTime) {
+                    throw new RuntimeException("Streams application not down after 30 seconds.");
+                }
+                sleep(1000);
+            }
+        } finally {
+            if (adminClient != null) {
+                adminClient.close();
+            }
+        }
+    }
+
+    private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+
+        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
+        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+            final Set<String> topics = new HashSet<>();
+            topics.add("data");
+            consumer.subscribe(topics);
+            consumer.poll(0);
+
+            final Set<TopicPartition> partitions = new HashSet<>();
+            for (final String topic : topics) {
+                for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
+                    partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+                }
+            }
+
+            for (final TopicPartition tp : partitions) {
+                final long offset = consumer.position(tp);
+                committedOffsets.put(tp, offset);
+            }
+        }
+
+        return committedOffsets;
+    }
+
+    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
+                                                                                                           final Map<TopicPartition, Long> committedOffsets) {
+        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
+
+        long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+        boolean allRecordsReceived = false;
+        while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
+            final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
+
+            for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
+                maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+                addRecord(record, recordPerTopicPerPartition);
+            }
+
+            if (receivedRecords.count() > 0) {
+                allRecordsReceived =
+                    receivedAllRecords(
+                        recordPerTopicPerPartition.get("data"),
+                        recordPerTopicPerPartition.get("echo"),
+                        committedOffsets);
+            }
+        }
+
+        if (!allRecordsReceived) {
+            throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
+        }
+
+        return recordPerTopicPerPartition;
+    }
+
+    private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
+                                  final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition) {
+
+        final String topic = record.topic();
+        final TopicPartition partition = new TopicPartition(topic, record.partition());
+
+        if ("data".equals(topic)
+            || "echo".equals(topic)
+            || "min".equals(topic)
+            || "sum".equals(topic)) {
+
+            Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
+                = recordPerTopicPerPartition.get(topic);
+
+            if (topicRecordsPerPartition == null) {
+                topicRecordsPerPartition = new HashMap<>();
+                recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
+            }
+
+            List<ConsumerRecord<byte[], byte[]>> records = topicRecordsPerPartition.get(partition);
+            if (records == null) {
+                records = new ArrayList<>();
+                topicRecordsPerPartition.put(partition, records);
+            }
+            records.add(record);
+        } else {
+            throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
+        }
+    }
+
+    private static boolean receivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> supersetExpectedRecords,
+                                              final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords,
+                                              final Map<TopicPartition, Long> committedOffsets) {
+        if (supersetExpectedRecords == null
+            || receivedRecords == null
+            || supersetExpectedRecords.keySet().size() < committedOffsets.keySet().size()
+            || receivedRecords.keySet().size() < committedOffsets.keySet().size()) {
+
+            return false;
+        }
+
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
+            final TopicPartition tp = partitionRecords.getKey();
+            final int numberOfReceivedRecords = partitionRecords.getValue().size();
+            final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
+            if (committed != null) {
+                if (numberOfReceivedRecords < committed) {
+                    return false;
+                }
+            } else if (numberOfReceivedRecords > 0) {
+                throw new RuntimeException("Result verification failed for partition " + tp
+                    + ". No offset was committed but we received " + numberOfReceivedRecords + " records.");
+            }
+        }
+
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
+            try {
+                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+                final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecords = supersetExpectedRecords.get(inputTopicPartition).iterator();
+
+                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+                    final ConsumerRecord<byte[], byte[]> expected = expectedRecords.next();
+
+                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                    final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                    final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
+                    final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+
+                    if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
+                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
+                    }
+                }
+            } catch (final NullPointerException | NoSuchElementException e) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private static void truncate(final String topic,
+                                 final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
+                                 final Map<TopicPartition, Long> committedOffsets) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecords = recordPerTopicPerPartition.get(topic);
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> truncatedTopicRecords = recordPerTopicPerPartition.get(topic);
+
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) {
+            final TopicPartition tp = partitionRecords.getKey();
+            final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
+            truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committed != null ? committed.intValue() : 0));
+        }
+
+        recordPerTopicPerPartition.put(topic, truncatedTopicRecords);
+    }
+
+    private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+
+        final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+            try {
+                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+                final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+
+                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+                    final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                    final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                    final String key = stringDeserializer.deserialize(input.topic(), input.key());
+                    final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
+
+
+                    Integer min = currentMinPerKey.get(key);
+                    if (min == null) {
+                        min = value;
+                    }
+                    min = Math.min(min, value);
+                    currentMinPerKey.put(key, min);
+
+                    if (!receivedKey.equals(key) || receivedValue != min) {
+                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
+                    }
+                }
+            } catch (final NullPointerException e) {
+                System.err.println(inputPerTopicPerPartition);
+                e.printStackTrace(System.err);
+                throw e;
+            }
+        }
+    }
+
+    private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+                                  final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+        final LongDeserializer longDeserializer = new LongDeserializer();
+
+        final HashMap<String, Long> currentSumPerKey = new HashMap<>();
+        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+            try {
+                final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+                final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+
+                for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+                    final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+                    final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+                    final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+                    final String key = stringDeserializer.deserialize(input.topic(), input.key());
+                    final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
+
+                    Long sum = currentSumPerKey.get(key);
+                    if (sum == null) {
+                        sum = 0L;
+                    }
+                    sum += value;
+                    currentSumPerKey.put(key, sum);
+
+                    if (!receivedKey.equals(key) || receivedValue != sum) {
+                        throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
+                    }
+                }
+            } catch (final NullPointerException e) {
+                System.err.println(inputPerTopicPerPartition);
+                e.printStackTrace(System.err);
+                throw e;
+            }
+        }
+    }
+
+    private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
+                                                     final String kafka) {
+        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "min", "sum");
+        consumer.assign(partitions);
+        consumer.seekToEnd(partitions);
+        consumer.poll(0);
+
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+        try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+            for (final TopicPartition tp : partitions) {
+                final ProducerRecord<String, String> record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
+
+                producer.send(record, new Callback() {
+                    @Override
+                    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+                        if (exception != null) {
+                            exception.printStackTrace();
+                            Exit.exit(1);
+                        }
+                    }
+                });
+            }
+        }
+
+        final StringDeserializer stringDeserializer = new StringDeserializer();
+
+        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+        while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
+            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                final String topic = record.topic();
+                final TopicPartition tp = new TopicPartition(topic, record.partition());
+
+                try {
+                    final String key = stringDeserializer.deserialize(topic, record.key());
+                    final String value = stringDeserializer.deserialize(topic, record.value());
+
+                    if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) {
+                        throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
+                            "Expected record <'key','value'> from one of " + partitions + " but got"
+                            + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
+                    }
+                } catch (final SerializationException e) {
+                    throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
+                        "Expected record <'key','value'> from one of " + partitions + " but got " + record, e);
+                }
+
+            }
+        }
+        if (!partitions.isEmpty()) {
+            throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
+        }
+    }
+
+    private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer,
+                                                         final String... topics) {
+        final ArrayList<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
new file mode 100644
index 0000000..58499aa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import java.io.File;
+
+public class StreamsEosTest {
+
+    /**
+     *  args ::= command kafka zookeeper stateDir
+     *  command := "run" | "process" | "verify"
+     */
+    public static void main(final String[] args) throws Exception {
+        final String kafka = args[0];
+        final String stateDir = args.length > 1 ? args[1] : null;
+        final String command = args.length > 2 ? args[2] : null;
+
+        System.out.println("StreamsTest instance started");
+        System.out.println("command=" + command);
+        System.out.println("kafka=" + kafka);
+        System.out.println("stateDir=" + stateDir);
+
+        if (command == null || stateDir == null) {
+            System.exit(-1);
+        }
+
+        switch (command) {
+            case "run":
+                EosTestDriver.generate(kafka);
+                break;
+            case "process":
+                final EosTestClient client = new EosTestClient(new File(stateDir), kafka);
+                client.start();
+                break;
+            case "verify":
+                EosTestDriver.verify(kafka);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 905320a..867e3f5 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -150,6 +150,16 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
                                                           command)
 
 
+class StreamsEosTestBaseService(StreamsTestBaseService):
+    """Base class for Streams EOS Test services providing some common settings and functionality"""
+
+    def __init__(self, test_context, kafka, command):
+        super(StreamsEosTestBaseService, self).__init__(test_context,
+                                                        kafka,
+                                                        "org.apache.kafka.streams.tests.StreamsEosTest",
+                                                        command)
+
+
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
@@ -160,6 +170,21 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
         super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
 
 
+class StreamsEosTestDriverService(StreamsEosTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
+
+
+class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
+
+
+class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
+
+
 class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
new file mode 100644
index 0000000..305cde0
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, StreamsEosTestVerifyRunnerService
+import time
+
+
+class StreamsEosTest(KafkaTest):
+    """
+    Test of Kafka Streams exactly-once semantics
+    """
+
+    def __init__(self, test_context):
+        super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
+            'data' : { 'partitions': 5, 'replication-factor': 2 },
+            'echo' : { 'partitions': 5, 'replication-factor': 2 },
+            'min' : { 'partitions': 5, 'replication-factor': 2 },
+            'sum' : { 'partitions': 5, 'replication-factor': 2 }
+        })
+        self.driver = StreamsEosTestDriverService(test_context, self.kafka)
+        self.processor1 = StreamsEosTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsEosTestJobRunnerService(test_context, self.kafka)
+        self.verifier = StreamsEosTestVerifyRunnerService(test_context, self.kafka)
+
+    @cluster(num_nodes=8)
+    def test_rebalance(self):
+        """
+        Starts and stops two test clients a few times.
+        Ensure that all records are delivered exactly-once.
+        """
+
+        self.driver.start()
+        self.processor1.start()
+
+        time.sleep(30)
+
+        self.processor2.start()
+
+        time.sleep(30)
+        self.processor1.stop()
+
+        time.sleep(30)
+        self.processor1.start()
+
+        time.sleep(30)
+        self.processor2.stop()
+
+        time.sleep(30)
+
+        self.driver.stop()
+
+        self.processor1.stop()
+        self.processor2.stop()
+
+        self.verifier.start()
+        self.verifier.wait()
+
+        self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+
+    @cluster(num_nodes=8)
+    def test_failure_and_recovery(self):
+        """
+        Starts two test clients, then abort (kill -9) and restart them a few times.
+        Ensure that all records are delivered exactly-once.
+        """
+
+        self.driver.start()
+        self.processor1.start()
+        self.processor2.start()
+
+        time.sleep(30)
+        self.processor1.abortThenRestart()
+
+        time.sleep(30)
+        self.processor1.abortThenRestart()
+
+        time.sleep(30)
+        self.processor2.abortThenRestart()
+
+        time.sleep(30)
+
+        self.driver.stop()
+
+        self.processor1.stop()
+        self.processor2.stop()
+
+        self.verifier.start()
+        self.verifier.wait()
+
+        self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)