You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/04/09 19:46:42 UTC

[kafka] branch 2.7 updated: KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 232cd33  KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)
232cd33 is described below

commit 232cd33f1d49e5b3143040869d8954c018805d03
Author: Luke Chen <43...@users.noreply.github.com>
AuthorDate: Sat Apr 10 03:19:14 2021 +0800

    KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../streams/integration/EosIntegrationTest.java    | 220 ++++++++++++++++-----
 2 files changed, 171 insertions(+), 51 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4ff500b..97a96c8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -188,7 +188,7 @@
               files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="MethodLength"
-              files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
+              files="(EosIntegrationTest|EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index ff3eeae..4b0f59a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -82,10 +82,10 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 @Category({IntegrationTest.class})
@@ -121,6 +121,8 @@ public class EosIntegrationTest {
 
     private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);
 
+    private volatile boolean hasUnexpectedError = false;
+
     @Parameters(name = "{0}")
     public static Collection<String[]> data() {
         return Arrays.asList(new String[][] {
@@ -267,18 +269,20 @@ public class EosIntegrationTest {
                         inputData.size()
                     );
 
-                checkResultPerKey(committedRecords, inputData);
+                checkResultPerKey(committedRecords, inputData, "The committed records do not match what expected");
             }
         }
     }
 
-    private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult) {
+    private void checkResultPerKey(final List<KeyValue<Long, Long>> result,
+                                   final List<KeyValue<Long, Long>> expectedResult,
+                                   final String reason) {
         final Set<Long> allKeys = new HashSet<>();
         addAllKeys(allKeys, result);
         addAllKeys(allKeys, expectedResult);
 
         for (final Long key : allKeys) {
-            assertThat(getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
+            assertThat(reason, getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
         }
     }
 
@@ -379,19 +383,21 @@ public class EosIntegrationTest {
     public void shouldNotViolateEosIfOneTaskFails() throws Exception {
         // this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to copy all 40 records into the output topic
-        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+        //
+        // the app first commits after each 10 records per partition(total 20 records), and thus will have 2 * 5 uncommitted writes
         //
         // the failure gets inject after 20 committed and 30 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 record got written)
 
-        try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig)) {
+        try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig, MAX_POLL_INTERVAL_MS)) {
             startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
             final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
 
-            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
+                committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
             dataBeforeFailure.addAll(committedDataBeforeFailure);
             dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
 
@@ -403,13 +409,29 @@ public class EosIntegrationTest {
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "StreamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeFailure);
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C
+            // p-1: ---> 10 rec + C
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
             final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+            checkResultPerKey(
+                committedRecords,
+                committedDataBeforeFailure,
+                "The committed records before failure do not match what expected");
 
-            checkResultPerKey(committedRecords, committedDataBeforeFailure);
-            checkResultPerKey(uncommittedRecords, dataBeforeFailure);
+            writeInputData(uncommittedDataBeforeFailure);
+
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec (pending)
+            // p-1: ---> 10 rec + C  + 5 rec (pending)
+
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+            checkResultPerKey(
+                uncommittedRecords,
+                dataBeforeFailure,
+                "The uncommitted records before failure do not match what expected");
 
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
@@ -418,6 +440,11 @@ public class EosIntegrationTest {
                 () -> uncaughtException != null, MAX_WAIT_TIME_MS,
                 "Should receive uncaught exception from one StreamThread.");
 
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+            // p-1: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
                 CONSUMER_GROUP_ID + "_ALL");
@@ -426,17 +453,28 @@ public class EosIntegrationTest {
                 uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
                 CONSUMER_GROUP_ID);
 
-            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
+                uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
             allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
             allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
             allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
 
-            final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int committedRecordsAfterRecoverySize = uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+            final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>(committedRecordsAfterRecoverySize);
             expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
             expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
 
-            checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
-            checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
+            checkResultPerKey(
+                allCommittedRecords,
+                allExpectedCommittedRecordsAfterRecovery,
+                "The all committed records after recovery do not match what expected");
+            checkResultPerKey(
+                committedRecordsAfterFailure,
+                expectedCommittedRecordsAfterRecovery,
+                "The committed records after recovery do not match what expected");
+
+            assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
         }
     }
 
@@ -444,22 +482,26 @@ public class EosIntegrationTest {
     public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
         // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to emit all 40 update records into the output topic
-        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+        //
+        // the app first commits after each 10 records per partition (total 20 records), and thus will have 2 * 5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
-        // in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store
+        // in the uncommitted batch, sending some data for the new key to validate that upon resuming they will not be shown up in the store
         //
-        // the failure gets inject after 20 committed and 10 uncommitted records got received
+        // the failure gets inject after 20 committed and 30 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state stores should contain the correct sums
         // per key (even if some records got processed twice)
 
-        try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig)) {
+        // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
+        // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
+        try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig, 3 * MAX_POLL_INTERVAL_MS)) {
             startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
             final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
 
-            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+            final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
+                committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
             dataBeforeFailure.addAll(committedDataBeforeFailure);
             dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
 
@@ -471,15 +513,36 @@ public class EosIntegrationTest {
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "SteamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeFailure);
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C
+            // p-1: ---> 10 rec + C
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
             final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+            checkResultPerKey(
+                committedRecords,
+                computeExpectedResult(committedDataBeforeFailure),
+                "The committed records before failure do not match what expected");
+
+            writeInputData(uncommittedDataBeforeFailure);
 
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec (pending)
+            // p-1: ---> 10 rec + C  + 5 rec (pending)
+
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
             final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
-            checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
-            checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
-            verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
+
+
+            checkResultPerKey(
+                uncommittedRecords,
+                expectedResultBeforeFailure,
+                "The uncommitted records before failure do not match what expected");
+            verifyStateStore(
+                streams,
+                getMaxPerKey(expectedResultBeforeFailure),
+                "The state store content before failure do not match what expected");
 
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
@@ -488,6 +551,11 @@ public class EosIntegrationTest {
                 () -> uncaughtException != null, MAX_WAIT_TIME_MS,
                 "Should receive uncaught exception from one StreamThread.");
 
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+            // p-1: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
                 CONSUMER_GROUP_ID + "_ALL");
@@ -496,19 +564,31 @@ public class EosIntegrationTest {
                 uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
                 CONSUMER_GROUP_ID);
 
-            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
+                uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
             allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
             allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
             allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
 
             final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
 
-            checkResultPerKey(allCommittedRecords, expectedResult);
+            checkResultPerKey(
+                allCommittedRecords,
+                expectedResult,
+                "The all committed records after recovery do not match what expected");
+
             checkResultPerKey(
                 committedRecordsAfterFailure,
-                expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+                expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()),
+                "The committed records after recovery do not match what expected");
+
+            verifyStateStore(
+                streams,
+                getMaxPerKey(expectedResult),
+                "The state store content after recovery do not match what expected");
 
-            verifyStateStore(streams, getMaxPerKey(expectedResult));
+            assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
         }
     }
 
@@ -516,9 +596,10 @@ public class EosIntegrationTest {
     public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
         // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
         // the app is supposed to copy all 60 records into the output topic
-        // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
-        // a stall gets injected after 20 committed and 30 uncommitted records got received
+        // the app first commits after each 10 records per partition, and thus will have 2 * 5 uncommitted writes
+        //
+        // Then, a stall gets injected after 20 committed and 30 uncommitted records got received
         // -> the stall only affects one thread and should trigger a rebalance
         // after rebalancing, we should read 40 committed records (even if 50 record got written)
         //
@@ -526,8 +607,8 @@ public class EosIntegrationTest {
         // we write the remaining 20 records and verify to read 60 result records
 
         try (
-            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
-            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig, MAX_POLL_INTERVAL_MS);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig, MAX_POLL_INTERVAL_MS)
         ) {
             startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
             startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
@@ -535,7 +616,8 @@ public class EosIntegrationTest {
             final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
             final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
 
-            final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
+            final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>(
+                committedDataBeforeStall.size() + uncommittedDataBeforeStall.size());
             dataBeforeStall.addAll(committedDataBeforeStall);
             dataBeforeStall.addAll(uncommittedDataBeforeStall);
 
@@ -549,13 +631,29 @@ public class EosIntegrationTest {
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "SteamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeStall);
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C
+            // p-1: ---> 10 rec + C
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
             final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
+            checkResultPerKey(
+                committedRecords,
+                committedDataBeforeStall,
+                "The committed records before stall do not match what expected");
+
+            writeInputData(uncommittedDataBeforeStall);
 
-            checkResultPerKey(committedRecords, committedDataBeforeStall);
-            checkResultPerKey(uncommittedRecords, dataBeforeStall);
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec (pending)
+            // p-1: ---> 10 rec + C  + 5 rec (pending)
+
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
+            checkResultPerKey(
+                uncommittedRecords,
+                dataBeforeStall,
+                "The uncommitted records before stall do not match what expected");
 
             LOG.info("Injecting Stall");
             stallInjected.set(true);
@@ -591,15 +689,24 @@ public class EosIntegrationTest {
                     "Streams1[" + streams1.allMetadata() + "]\n" +
                     "Streams2[" + streams2.allMetadata() + "]");
 
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+            // p-1: ---> 10 rec + C  + 5 rec + C    + 5 rec + C
+
             final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
                 uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(),
                 CONSUMER_GROUP_ID);
 
-            final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
+            final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>(
+                uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size());
             expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
             expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
 
-            checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
+            checkResultPerKey(
+                committedRecordsAfterRebalance,
+                expectedCommittedRecordsAfterRebalance,
+                "The all committed records after rebalance do not match what expected");
 
             LOG.info("Releasing Stall");
             doStall = false;
@@ -618,25 +725,36 @@ public class EosIntegrationTest {
 
             writeInputData(dataAfterSecondRebalance);
 
+            // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+            //
+            // p-0: ---> 10 rec + C  + 5 rec + C    + 5 rec + C   + 10 rec + C
+            // p-1: ---> 10 rec + C  + 5 rec + C    + 5 rec + C   + 10 rec + C
+
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeStall.size() + uncommittedDataBeforeStall.size()
                 + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(),
                 CONSUMER_GROUP_ID + "_ALL");
 
-            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int allCommittedRecordsAfterRecoverySize = committedDataBeforeStall.size() +
+                uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size();
+            final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
             allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeStall);
             allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeStall);
             allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
             allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
 
-            checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
+            checkResultPerKey(
+                allCommittedRecords,
+                allExpectedCommittedRecordsAfterRecovery,
+                "The all committed records after recovery do not match what expected");
         }
     }
 
     private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
                                                    final long toExclusive,
                                                    final Long... keys) {
-        final List<KeyValue<Long, Long>> data = new ArrayList<>();
+        final Long dataSize = keys.length * (toExclusive - fromInclusive);
+        final List<KeyValue<Long, Long>> data = new ArrayList<>(dataSize.intValue());
 
         for (final Long k : keys) {
             for (long v = fromInclusive; v < toExclusive; ++v) {
@@ -651,7 +769,8 @@ public class EosIntegrationTest {
                                          final boolean withState,
                                          final String appDir,
                                          final int numberOfStreamsThreads,
-                                         final String eosConfig) {
+                                         final String eosConfig,
+                                         final int maxPollIntervalMs) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
         stallInjected = new AtomicBoolean(false);
@@ -748,7 +867,7 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
-        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
@@ -763,9 +882,9 @@ public class EosIntegrationTest {
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
         streams.setUncaughtExceptionHandler((t, e) -> {
-            if (uncaughtException != null) {
+            if (uncaughtException != null || !e.getMessage().contains("Injected test exception")) {
                 e.printStackTrace(System.err);
-                fail("Should only get one uncaught exception from Streams.");
+                hasUnexpectedError = true;
             }
             uncaughtException = e;
         });
@@ -847,16 +966,17 @@ public class EosIntegrationTest {
     }
 
     private void verifyStateStore(final KafkaStreams streams,
-                                  final Set<KeyValue<Long, Long>> expectedStoreContent) throws Exception {
+                                  final Set<KeyValue<Long, Long>> expectedStoreContent,
+                                  final String reason) throws Exception {
         final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
             .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
         assertNotNull(store);
 
         final KeyValueIterator<Long, Long> it = store.all();
         while (it.hasNext()) {
-            assertTrue(expectedStoreContent.remove(it.next()));
+            assertTrue(reason, expectedStoreContent.remove(it.next()));
         }
 
-        assertTrue(expectedStoreContent.isEmpty());
+        assertTrue(reason, expectedStoreContent.isEmpty());
     }
 }