You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/04/09 19:46:42 UTC
[kafka] branch 2.7 updated: KAFKA-9831: increase
max.poll.interval.ms to avoid unexpected rebalance (#10301)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 232cd33 KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)
232cd33 is described below
commit 232cd33f1d49e5b3143040869d8954c018805d03
Author: Luke Chen <43...@users.noreply.github.com>
AuthorDate: Sat Apr 10 03:19:14 2021 +0800
KAFKA-9831: increase max.poll.interval.ms to avoid unexpected rebalance (#10301)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
checkstyle/suppressions.xml | 2 +-
.../streams/integration/EosIntegrationTest.java | 220 ++++++++++++++++-----
2 files changed, 171 insertions(+), 51 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4ff500b..97a96c8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -188,7 +188,7 @@
files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength"
- files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
+ files="(EosIntegrationTest|EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index ff3eeae..4b0f59a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -82,10 +82,10 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.wa
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
@@ -121,6 +121,8 @@ public class EosIntegrationTest {
private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0);
+ private volatile boolean hasUnexpectedError = false;
+
@Parameters(name = "{0}")
public static Collection<String[]> data() {
return Arrays.asList(new String[][] {
@@ -267,18 +269,20 @@ public class EosIntegrationTest {
inputData.size()
);
- checkResultPerKey(committedRecords, inputData);
+ checkResultPerKey(committedRecords, inputData, "The committed records do not match what expected");
}
}
}
- private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult) {
+ private void checkResultPerKey(final List<KeyValue<Long, Long>> result,
+ final List<KeyValue<Long, Long>> expectedResult,
+ final String reason) {
final Set<Long> allKeys = new HashSet<>();
addAllKeys(allKeys, result);
addAllKeys(allKeys, expectedResult);
for (final Long key : allKeys) {
- assertThat(getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
+ assertThat(reason, getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
}
}
@@ -379,19 +383,21 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskFails() throws Exception {
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
- // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ //
+ // the app first commits after each 10 records per partition(total 20 records), and thus will have 2 * 5 uncommitted writes
//
// the failure gets inject after 20 committed and 30 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50 record got written)
- try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig)) {
+ try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig, MAX_POLL_INTERVAL_MS)) {
startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
- final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+ final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
+ committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
dataBeforeFailure.addAll(committedDataBeforeFailure);
dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
@@ -403,13 +409,29 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"StreamsTasks did not request commit.");
- writeInputData(uncommittedDataBeforeFailure);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C
+ // p-1: ---> 10 rec + C
- final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+ checkResultPerKey(
+ committedRecords,
+ committedDataBeforeFailure,
+ "The committed records before failure do not match what expected");
- checkResultPerKey(committedRecords, committedDataBeforeFailure);
- checkResultPerKey(uncommittedRecords, dataBeforeFailure);
+ writeInputData(uncommittedDataBeforeFailure);
+
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec (pending)
+ // p-1: ---> 10 rec + C + 5 rec (pending)
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+ checkResultPerKey(
+ uncommittedRecords,
+ dataBeforeFailure,
+ "The uncommitted records before failure do not match what expected");
errorInjected.set(true);
writeInputData(dataAfterFailure);
@@ -418,6 +440,11 @@ public class EosIntegrationTest {
() -> uncaughtException != null, MAX_WAIT_TIME_MS,
"Should receive uncaught exception from one StreamThread.");
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
+ // p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
+
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID + "_ALL");
@@ -426,17 +453,28 @@ public class EosIntegrationTest {
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID);
- final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
+ uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
- final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ final int committedRecordsAfterRecoverySize = uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+ final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>(committedRecordsAfterRecoverySize);
expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
- checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
- checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
+ checkResultPerKey(
+ allCommittedRecords,
+ allExpectedCommittedRecordsAfterRecovery,
+ "The all committed records after recovery do not match what expected");
+ checkResultPerKey(
+ committedRecordsAfterFailure,
+ expectedCommittedRecordsAfterRecovery,
+ "The committed records after recovery do not match what expected");
+
+ assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
}
}
@@ -444,22 +482,26 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
- // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ //
+ // the app first commits after each 10 records per partition (total 20 records), and thus will have 2 * 5 uncommitted writes
// and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
- // in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store
+ // in the uncommitted batch, sending some data for the new key to validate that upon resuming they will not be shown up in the store
//
- // the failure gets inject after 20 committed and 10 uncommitted records got received
+ // the failure gets inject after 20 committed and 30 uncommitted records got received
// -> the failure only kills one thread
// after fail over, we should read 40 committed records and the state stores should contain the correct sums
// per key (even if some records got processed twice)
- try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig)) {
+ // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms
+ // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered
+ try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig, 3 * MAX_POLL_INTERVAL_MS)) {
startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L);
- final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+ final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(
+ committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size());
dataBeforeFailure.addAll(committedDataBeforeFailure);
dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
@@ -471,15 +513,36 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"SteamsTasks did not request commit.");
- writeInputData(uncommittedDataBeforeFailure);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C
+ // p-1: ---> 10 rec + C
- final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+ checkResultPerKey(
+ committedRecords,
+ computeExpectedResult(committedDataBeforeFailure),
+ "The committed records before failure do not match what expected");
+
+ writeInputData(uncommittedDataBeforeFailure);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec (pending)
+ // p-1: ---> 10 rec + C + 5 rec (pending)
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
- checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
- checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
- verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
+
+
+ checkResultPerKey(
+ uncommittedRecords,
+ expectedResultBeforeFailure,
+ "The uncommitted records before failure do not match what expected");
+ verifyStateStore(
+ streams,
+ getMaxPerKey(expectedResultBeforeFailure),
+ "The state store content before failure do not match what expected");
errorInjected.set(true);
writeInputData(dataAfterFailure);
@@ -488,6 +551,11 @@ public class EosIntegrationTest {
() -> uncaughtException != null, MAX_WAIT_TIME_MS,
"Should receive uncaught exception from one StreamThread.");
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
+ // p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
+
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID + "_ALL");
@@ -496,19 +564,31 @@ public class EosIntegrationTest {
uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
CONSUMER_GROUP_ID);
- final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() +
+ uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
- checkResultPerKey(allCommittedRecords, expectedResult);
+ checkResultPerKey(
+ allCommittedRecords,
+ expectedResult,
+ "The all committed records after recovery do not match what expected");
+
checkResultPerKey(
committedRecordsAfterFailure,
- expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+ expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()),
+ "The committed records after recovery do not match what expected");
+
+ verifyStateStore(
+ streams,
+ getMaxPerKey(expectedResult),
+ "The state store content after recovery do not match what expected");
- verifyStateStore(streams, getMaxPerKey(expectedResult));
+ assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false));
}
}
@@ -516,9 +596,10 @@ public class EosIntegrationTest {
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
- // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
//
- // a stall gets injected after 20 committed and 30 uncommitted records got received
+ // the app first commits after each 10 records per partition, and thus will have 2 * 5 uncommitted writes
+ //
+ // Then, a stall gets injected after 20 committed and 30 uncommitted records got received
// -> the stall only affects one thread and should trigger a rebalance
// after rebalancing, we should read 40 committed records (even if 50 record got written)
//
@@ -526,8 +607,8 @@ public class EosIntegrationTest {
// we write the remaining 20 records and verify to read 60 result records
try (
- final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
- final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
+ final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig, MAX_POLL_INTERVAL_MS);
+ final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig, MAX_POLL_INTERVAL_MS)
) {
startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
@@ -535,7 +616,8 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
- final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
+ final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>(
+ committedDataBeforeStall.size() + uncommittedDataBeforeStall.size());
dataBeforeStall.addAll(committedDataBeforeStall);
dataBeforeStall.addAll(uncommittedDataBeforeStall);
@@ -549,13 +631,29 @@ public class EosIntegrationTest {
() -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
"SteamsTasks did not request commit.");
- writeInputData(uncommittedDataBeforeStall);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C
+ // p-1: ---> 10 rec + C
- final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
+ checkResultPerKey(
+ committedRecords,
+ committedDataBeforeStall,
+ "The committed records before stall do not match what expected");
+
+ writeInputData(uncommittedDataBeforeStall);
- checkResultPerKey(committedRecords, committedDataBeforeStall);
- checkResultPerKey(uncommittedRecords, dataBeforeStall);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec (pending)
+ // p-1: ---> 10 rec + C + 5 rec (pending)
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
+ checkResultPerKey(
+ uncommittedRecords,
+ dataBeforeStall,
+ "The uncommitted records before stall do not match what expected");
LOG.info("Injecting Stall");
stallInjected.set(true);
@@ -591,15 +689,24 @@ public class EosIntegrationTest {
"Streams1[" + streams1.allMetadata() + "]\n" +
"Streams2[" + streams2.allMetadata() + "]");
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C
+ // p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C
+
final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(),
CONSUMER_GROUP_ID);
- final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
+ final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>(
+ uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size());
expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
- checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
+ checkResultPerKey(
+ committedRecordsAfterRebalance,
+ expectedCommittedRecordsAfterRebalance,
+ "The all committed records after rebalance do not match what expected");
LOG.info("Releasing Stall");
doStall = false;
@@ -618,25 +725,36 @@ public class EosIntegrationTest {
writeInputData(dataAfterSecondRebalance);
+ // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
+ //
+ // p-0: ---> 10 rec + C + 5 rec + C + 5 rec + C + 10 rec + C
+ // p-1: ---> 10 rec + C + 5 rec + C + 5 rec + C + 10 rec + C
+
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeStall.size() + uncommittedDataBeforeStall.size()
+ dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(),
CONSUMER_GROUP_ID + "_ALL");
- final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ final int allCommittedRecordsAfterRecoverySize = committedDataBeforeStall.size() +
+ uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size();
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize);
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeStall);
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeStall);
allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
- checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
+ checkResultPerKey(
+ allCommittedRecords,
+ allExpectedCommittedRecordsAfterRecovery,
+ "The all committed records after recovery do not match what expected");
}
}
private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
final long toExclusive,
final Long... keys) {
- final List<KeyValue<Long, Long>> data = new ArrayList<>();
+ final Long dataSize = keys.length * (toExclusive - fromInclusive);
+ final List<KeyValue<Long, Long>> data = new ArrayList<>(dataSize.intValue());
for (final Long k : keys) {
for (long v = fromInclusive; v < toExclusive; ++v) {
@@ -651,7 +769,8 @@ public class EosIntegrationTest {
final boolean withState,
final String appDir,
final int numberOfStreamsThreads,
- final String eosConfig) {
+ final String eosConfig,
+ final int maxPollIntervalMs) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
stallInjected = new AtomicBoolean(false);
@@ -748,7 +867,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+ properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
@@ -763,9 +882,9 @@ public class EosIntegrationTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setUncaughtExceptionHandler((t, e) -> {
- if (uncaughtException != null) {
+ if (uncaughtException != null || !e.getMessage().contains("Injected test exception")) {
e.printStackTrace(System.err);
- fail("Should only get one uncaught exception from Streams.");
+ hasUnexpectedError = true;
}
uncaughtException = e;
});
@@ -847,16 +966,17 @@ public class EosIntegrationTest {
}
private void verifyStateStore(final KafkaStreams streams,
- final Set<KeyValue<Long, Long>> expectedStoreContent) throws Exception {
+ final Set<KeyValue<Long, Long>> expectedStoreContent,
+ final String reason) throws Exception {
final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
.getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);
final KeyValueIterator<Long, Long> it = store.all();
while (it.hasNext()) {
- assertTrue(expectedStoreContent.remove(it.next()));
+ assertTrue(reason, expectedStoreContent.remove(it.next()));
}
- assertTrue(expectedStoreContent.isEmpty());
+ assertTrue(reason, expectedStoreContent.isEmpty());
}
}