You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/14 16:19:02 UTC

kafka git commit: KAFKA-5361: Add more integration tests for Streams EOS

Repository: kafka
Updated Branches:
  refs/heads/trunk b20d9333b -> 160933bc0


KAFKA-5361: Add more integration tests for Streams EOS

 - multi-subtopology tests
 - fencing test
 - producer fenced bug fix: Streams did not recover correctly from ProducerFencedException

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3276 from mjsax/kafka-5361-add-eos-integration-tests-for-streams-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/160933bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/160933bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/160933bc

Branch: refs/heads/trunk
Commit: 160933bc092b4392cac1dd9497010320c6ae4d10
Parents: b20d933
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jun 14 09:18:59 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 14 09:18:59 2017 -0700

----------------------------------------------------------------------
 .../internals/RecordCollectorImpl.java          |  10 +-
 .../processor/internals/StreamThread.java       |   7 +-
 .../streams/integration/EosIntegrationTest.java | 484 ++++++++++---------
 3 files changed, 270 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index e1a86b4..d49cf58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
@@ -106,7 +107,11 @@ public class RecordCollectorImpl implements RecordCollector {
                         } else {
                             if (sendException == null) {
                                 sendException = exception;
-                                log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
+                                if (sendException instanceof ProducerFencedException) {
+                                    log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and it will be closed as it is a zombie.", logPrefix, topic, exception);
+                                } else {
+                                    log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
+                                }
                             }
                         }
                     }
@@ -125,6 +130,9 @@ public class RecordCollectorImpl implements RecordCollector {
 
     private void checkForException() {
         if (sendException != null) {
+            if (sendException instanceof ProducerFencedException) {
+                throw (ProducerFencedException) sendException;
+            }
             throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 23ce958..39369c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -259,7 +259,7 @@ public class StreamThread extends Thread {
                     } catch (final LockException e) {
                         // ignore and retry
                         if (!retryingTasks.contains(taskId)) {
-                            log.warn("{} Could not create task {}. Will retry. {}", logPrefix, taskId, e);
+                            log.warn("{} Could not create task {} due to {}. Will retry.", logPrefix, taskId, e.getMessage());
                             retryingTasks.add(taskId);
                         }
                     }
@@ -1427,7 +1427,10 @@ public class StreamThread extends Thread {
         try {
             task.close(false);
         } catch (final Exception f) {
-            log.warn("{} Failed to close zombie task: ", logPrefix, f);
+            if (!log.isDebugEnabled() && !log.isTraceEnabled()) {
+                log.warn("{} Failed to close zombie task: {}", logPrefix, f.getMessage());
+            }
+            log.debug("{} Failed to close zombie task: ", logPrefix, f);
         }
         activeTasks.remove(task.id);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 451d5e3..c7f4ff4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -48,6 +47,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -67,20 +67,30 @@ import static org.junit.Assert.fail;
 @Category({IntegrationTest.class})
 public class EosIntegrationTest {
     private static final int NUM_BROKERS = 3;
+    private static final int MAX_POLL_INTERVAL_MS = 5 * 1000;
+    private static final int MAX_WAIT_TIME_MS = 60 * 1000;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
+        {
+            put("auto.create.topics.enable", false);
+        }
+    });
 
     private static String applicationId;
     private final static String CONSUMER_GROUP_ID = "readCommitted";
     private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
-    private final static String MULTI_PARTITION_INPUT_TOPIC = "inputTopic";
+    private final static String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
+    private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
     private final static int NUM_TOPIC_PARTITIONS = 2;
-    private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "outputTopic";
+    private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
+    private final static String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
     private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
     private final String storeName = "store";
 
     private AtomicBoolean errorInjected;
+    private AtomicBoolean injectGC;
+    private volatile boolean doGC = true;
     private AtomicInteger commitRequested;
     private Throwable uncaughtException;
 
@@ -89,45 +99,58 @@ public class EosIntegrationTest {
     @Before
     public void createTopics() throws Exception {
         applicationId = "appId-" + ++testNumber;
-        CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+        CLUSTER.deleteTopicsAndWait(
+            SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
+            SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
+            SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
 
-        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+        CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
         CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+        CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+        CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
     }
 
     @Test
     public void shouldBeAbleToRunWithEosEnabled() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
     }
 
     @Test
     public void shouldBeAbleToRestartAfterClose() throws Exception {
-        runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+        runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
     }
 
     @Test
     public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
-        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC);
     }
 
     @Test
     public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
-        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
+        runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
+        runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
     }
 
     private void runSimpleCopyTest(final int numberOfRestarts,
                                    final String inputTopic,
+                                   final String throughTopic,
                                    final String outputTopic) throws Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         final KStream<Long, Long> input = builder.stream(inputTopic);
-        input
-            .mapValues(new ValueMapper<Long, Long>() {
-                @Override
-                public Long apply(final Long value) {
-                    return value;
-                }
-            })
-            .to(outputTopic);
+        KStream<Long, Long> output = input;
+        if (throughTopic != null) {
+            output = input.through(throughTopic);
+        }
+        output.to(outputTopic);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
             final long factor = i;
@@ -140,6 +163,7 @@ public class EosIntegrationTest {
                     Serdes.LongSerde.class.getName(),
                     new Properties() {
                         {
+                            put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                             put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                         }
                     }));
@@ -147,16 +171,7 @@ public class EosIntegrationTest {
             try {
                 streams.start();
 
-                final List<KeyValue<Long, Long>> inputData = new ArrayList<KeyValue<Long, Long>>() {
-                    {
-                        add(new KeyValue<>(0L, factor * 100));
-                        add(new KeyValue<>(0L, factor * 100 + 1L));
-                        add(new KeyValue<>(0L, factor * 100 + 2L));
-                        add(new KeyValue<>(1L, factor * 100));
-                        add(new KeyValue<>(1L, factor * 100 + 1L));
-                        add(new KeyValue<>(1L, factor * 100 + 2L));
-                    }
-                };
+                final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);
 
                 IntegrationTestUtils.produceKeyValuesSynchronously(
                     inputTopic,
@@ -239,22 +254,8 @@ public class EosIntegrationTest {
         try {
             streams.start();
 
-            final List<KeyValue<Long, Long>> firstBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 0L));
-                    add(new KeyValue<>(0L, 1L));
-                    add(new KeyValue<>(0L, 2L));
-                    add(new KeyValue<>(0L, 3L));
-                    add(new KeyValue<>(0L, 4L));
-                }
-            };
-            final List<KeyValue<Long, Long>> secondBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 5L));
-                    add(new KeyValue<>(0L, 6L));
-                    add(new KeyValue<>(0L, 7L));
-                }
-            };
+            final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
+            final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);
 
             IntegrationTestUtils.produceKeyValuesSynchronously(
                 SINGLE_PARTITION_INPUT_TOPIC,
@@ -320,67 +321,18 @@ public class EosIntegrationTest {
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 record got written)
 
-        final KafkaStreams streams = getKafkaStreams(false);
+        final KafkaStreams streams = getKafkaStreams(false, "appDir", 2);
         try {
             streams.start();
 
-            final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 0L));
-                    add(new KeyValue<>(0L, 1L));
-                    add(new KeyValue<>(0L, 2L));
-                    add(new KeyValue<>(0L, 3L));
-                    add(new KeyValue<>(0L, 4L));
-                    add(new KeyValue<>(0L, 5L));
-                    add(new KeyValue<>(0L, 6L));
-                    add(new KeyValue<>(0L, 7L));
-                    add(new KeyValue<>(0L, 8L));
-                    add(new KeyValue<>(0L, 9L));
-                    add(new KeyValue<>(1L, 0L));
-                    add(new KeyValue<>(1L, 1L));
-                    add(new KeyValue<>(1L, 2L));
-                    add(new KeyValue<>(1L, 3L));
-                    add(new KeyValue<>(1L, 4L));
-                    add(new KeyValue<>(1L, 5L));
-                    add(new KeyValue<>(1L, 6L));
-                    add(new KeyValue<>(1L, 7L));
-                    add(new KeyValue<>(1L, 8L));
-                    add(new KeyValue<>(1L, 9L));
-                }
-            };
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 10L));
-                    add(new KeyValue<>(0L, 11L));
-                    add(new KeyValue<>(0L, 12L));
-                    add(new KeyValue<>(0L, 13L));
-                    add(new KeyValue<>(0L, 14L));
-                    add(new KeyValue<>(1L, 10L));
-                    add(new KeyValue<>(1L, 11L));
-                    add(new KeyValue<>(1L, 12L));
-                    add(new KeyValue<>(1L, 13L));
-                    add(new KeyValue<>(1L, 14L));
-                }
-            };
+            final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
             dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
 
-            final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 15L));
-                    add(new KeyValue<>(0L, 16L));
-                    add(new KeyValue<>(0L, 17L));
-                    add(new KeyValue<>(0L, 18L));
-                    add(new KeyValue<>(0L, 19L));
-                    add(new KeyValue<>(1L, 15L));
-                    add(new KeyValue<>(1L, 16L));
-                    add(new KeyValue<>(1L, 17L));
-                    add(new KeyValue<>(1L, 18L));
-                    add(new KeyValue<>(1L, 19L));
-                }
-            };
+            final List<KeyValue<Long, Long>> dataAfterFailure = prepareData(15L, 20L, 0L, 1L);
 
             writeInputData(committedDataBeforeFailure);
 
@@ -389,7 +341,7 @@ public class EosIntegrationTest {
                 public boolean conditionMet() {
                     return commitRequested.get() == 2;
                 }
-            }, 60000, "SteamsTasks did not request commit.");
+            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
 
             writeInputData(uncommittedDataBeforeFailure);
 
@@ -407,7 +359,7 @@ public class EosIntegrationTest {
                 public boolean conditionMet() {
                     return uncaughtException != null;
                 }
-            }, 60000, "Should receive uncaught exception from one StreamThread.");
+            }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
 
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -433,9 +385,199 @@ public class EosIntegrationTest {
         }
     }
 
-    private KafkaStreams getKafkaStreams(final boolean withState) {
+    @Test
+    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+        // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
+        // the app is supposed to emit all 40 update records into the output topic
+        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+        // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
+        //
+        // the failure gets inject after 20 committed and 30 uncommitted records got received
+        // -> the failure only kills one thread
+        // after fail over, we should read 40 committed records and the state stores should contain the correct sums
+        // per key (even if some records got processed twice)
+
+        final KafkaStreams streams = getKafkaStreams(true, "appDir", 2);
+        try {
+            streams.start();
+
+            final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
+
+            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+            dataBeforeFailure.addAll(committedDataBeforeFailure);
+            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
+
+            final List<KeyValue<Long, Long>> dataAfterFailure = prepareData(15L, 20L, 0L, 1L);
+
+            writeInputData(committedDataBeforeFailure);
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return commitRequested.get() == 2;
+                }
+            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+
+            writeInputData(uncommittedDataBeforeFailure);
+
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+
+            final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
+            checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
+            checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
+            verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
+
+            errorInjected.set(true);
+            writeInputData(dataAfterFailure);
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return uncaughtException != null;
+                }
+            }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
+
+            final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+                committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+                CONSUMER_GROUP_ID + "_ALL");
+
+            final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
+                uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+                CONSUMER_GROUP_ID);
+
+            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
+            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
+            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
+
+            final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
+
+            checkResultPerKey(allCommittedRecords, expectedResult);
+            checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+
+            verifyStateStore(streams, getMaxPerKey(expectedResult));
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
+        // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
+        // the app is supposed to copy all 60 records into the output topic
+        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+        //
+        // a GC pause gets inject after 20 committed and 30 uncommitted records got received
+        // -> the GC pause only affects one thread and should trigger a rebalance
+        // after rebalancing, we should read 40 committed records (even if 50 record got written)
+        //
+        // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
+        // we write the remaining 20 records and verify to read 60 result records
+
+        final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
+        final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1);
+        try {
+            streams1.start();
+            streams2.start();
+
+            final List<KeyValue<Long, Long>> committedDataBeforeGC = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeGC = prepareData(10L, 15L, 0L, 1L);
+
+            final List<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<>();
+            dataBeforeGC.addAll(committedDataBeforeGC);
+            dataBeforeGC.addAll(uncommittedDataBeforeGC);
+
+            final List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = prepareData(15L, 20L, 0L, 1L);
+
+            final List<KeyValue<Long, Long>> dataAfterSecondRebalance = prepareData(20L, 30L, 0L, 1L);
+
+            writeInputData(committedDataBeforeGC);
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return commitRequested.get() == 2;
+                }
+            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+
+            writeInputData(uncommittedDataBeforeGC);
+
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeGC.size(), null);
+            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
+
+            checkResultPerKey(committedRecords, committedDataBeforeGC);
+            checkResultPerKey(uncommittedRecords, dataBeforeGC);
+
+            injectGC.set(true);
+            writeInputData(dataToTriggerFirstRebalance);
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 &&
+                        (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
+                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2);
+                }
+            }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+
+            final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
+                uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(),
+                CONSUMER_GROUP_ID);
+
+            final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
+            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeGC);
+            expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
+
+            checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
+
+            doGC = false;
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1
+                        && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
+                        && streams2.allMetadata().iterator().next().topicPartitions().size() == 1;
+                }
+            }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+
+            writeInputData(dataAfterSecondRebalance);
+
+            final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+                committedDataBeforeGC.size() + uncommittedDataBeforeGC.size()
+                + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(),
+                CONSUMER_GROUP_ID + "_ALL");
+
+            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeGC);
+            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeGC);
+            allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
+            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
+
+            checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
+        } finally {
+            streams1.close();
+            streams2.close();
+        }
+    }
+
+    private List<KeyValue<Long, Long>> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) {
+        final List<KeyValue<Long, Long>> data = new ArrayList<>();
+
+        for (final Long k : keys) {
+            for (long v = fromInclusive; v < toExclusive; ++v) {
+                data.add(new KeyValue<>(k, v));
+            }
+        }
+
+        return data;
+    }
+
+    private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
+        injectGC = new AtomicBoolean(false);
         final KStreamBuilder builder = new KStreamBuilder();
 
         String[] storeNames = null;
@@ -449,6 +591,7 @@ public class EosIntegrationTest {
 
             builder.addStateStore(storeSupplier);
         }
+
         final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
         input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() {
             @SuppressWarnings("unchecked")
@@ -473,6 +616,15 @@ public class EosIntegrationTest {
                             // only tries to fail once on one of the task
                             throw new RuntimeException("Injected test exception.");
                         }
+                        if (injectGC.compareAndSet(true, false)) {
+                            while (doGC) {
+                                try {
+                                    Thread.sleep(100);
+                                } catch (final InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        }
 
                         if ((value + 1) % 10 == 0) {
                             context.commit();
@@ -514,11 +666,14 @@ public class EosIntegrationTest {
                 new Properties() {
                     {
                         put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                        put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+                        put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
                         put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1);
                         put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
                         put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
                         put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                        put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+                        put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142");
                     }
                 }));
 
@@ -572,133 +727,6 @@ public class EosIntegrationTest {
         );
     }
 
-    @Test
-    public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
-        // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
-        // the app is supposed to emit all 40 update records into the output topic
-        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
-        // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
-        //
-        // the failure gets inject after 20 committed and 30 uncommitted records got received
-        // -> the failure only kills one thread
-        // after fail over, we should read 40 committed records and the state stores should contain the correct sums
-        // per key (even if some records got processed twice)
-
-        final KafkaStreams streams = getKafkaStreams(true);
-        try {
-            streams.start();
-
-            final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 0L));
-                    add(new KeyValue<>(0L, 1L));
-                    add(new KeyValue<>(0L, 2L));
-                    add(new KeyValue<>(0L, 3L));
-                    add(new KeyValue<>(0L, 4L));
-                    add(new KeyValue<>(0L, 5L));
-                    add(new KeyValue<>(0L, 6L));
-                    add(new KeyValue<>(0L, 7L));
-                    add(new KeyValue<>(0L, 8L));
-                    add(new KeyValue<>(0L, 9L));
-                    add(new KeyValue<>(1L, 0L));
-                    add(new KeyValue<>(1L, 1L));
-                    add(new KeyValue<>(1L, 2L));
-                    add(new KeyValue<>(1L, 3L));
-                    add(new KeyValue<>(1L, 4L));
-                    add(new KeyValue<>(1L, 5L));
-                    add(new KeyValue<>(1L, 6L));
-                    add(new KeyValue<>(1L, 7L));
-                    add(new KeyValue<>(1L, 8L));
-                    add(new KeyValue<>(1L, 9L));
-                }
-            };
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 10L));
-                    add(new KeyValue<>(0L, 11L));
-                    add(new KeyValue<>(0L, 12L));
-                    add(new KeyValue<>(0L, 13L));
-                    add(new KeyValue<>(0L, 14L));
-                    add(new KeyValue<>(1L, 10L));
-                    add(new KeyValue<>(1L, 11L));
-                    add(new KeyValue<>(1L, 12L));
-                    add(new KeyValue<>(1L, 13L));
-                    add(new KeyValue<>(1L, 14L));
-                }
-            };
-
-            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
-            dataBeforeFailure.addAll(committedDataBeforeFailure);
-            dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
-
-            final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
-                {
-                    add(new KeyValue<>(0L, 15L));
-                    add(new KeyValue<>(0L, 16L));
-                    add(new KeyValue<>(0L, 17L));
-                    add(new KeyValue<>(0L, 18L));
-                    add(new KeyValue<>(0L, 19L));
-                    add(new KeyValue<>(1L, 15L));
-                    add(new KeyValue<>(1L, 16L));
-                    add(new KeyValue<>(1L, 17L));
-                    add(new KeyValue<>(1L, 18L));
-                    add(new KeyValue<>(1L, 19L));
-                }
-            };
-
-            writeInputData(committedDataBeforeFailure);
-
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return commitRequested.get() == 2;
-                }
-            }, 60000, "SteamsTasks did not request commit.");
-
-            writeInputData(uncommittedDataBeforeFailure);
-
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
-            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
-
-            final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
-            checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
-            checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
-            verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
-
-            errorInjected.set(true);
-            writeInputData(dataAfterFailure);
-
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return uncaughtException != null;
-                }
-            }, 60000, "Should receive uncaught exception from one StreamThread.");
-
-            final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
-                committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
-                CONSUMER_GROUP_ID + "_ALL");
-
-            final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
-                uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
-                CONSUMER_GROUP_ID);
-
-            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
-            allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
-            allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
-            allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
-
-            final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
-
-            checkResultPerKey(allCommittedRecords, expectedResult);
-            checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
-
-            verifyStateStore(streams, getMaxPerKey(expectedResult));
-        } finally {
-            streams.close();
-        }
-    }
-
     private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
         final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());