You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/14 16:19:02 UTC
kafka git commit: KAFKA-5361: Add more integration tests for Streams
EOS
Repository: kafka
Updated Branches:
refs/heads/trunk b20d9333b -> 160933bc0
KAFKA-5361: Add more integration tests for Streams EOS
- multi-subtopology tests
- fencing test
- producer fenced bug fix: Streams did not recover correctly from ProducerFencedException
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3276 from mjsax/kafka-5361-add-eos-integration-tests-for-streams-api
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/160933bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/160933bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/160933bc
Branch: refs/heads/trunk
Commit: 160933bc092b4392cac1dd9497010320c6ae4d10
Parents: b20d933
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Jun 14 09:18:59 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 14 09:18:59 2017 -0700
----------------------------------------------------------------------
.../internals/RecordCollectorImpl.java | 10 +-
.../processor/internals/StreamThread.java | 7 +-
.../streams/integration/EosIntegrationTest.java | 484 ++++++++++---------
3 files changed, 270 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index e1a86b4..d49cf58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
@@ -106,7 +107,11 @@ public class RecordCollectorImpl implements RecordCollector {
} else {
if (sendException == null) {
sendException = exception;
- log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
+ if (sendException instanceof ProducerFencedException) {
+ log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and it will be closed as it is a zombie.", logPrefix, topic, exception);
+ } else {
+ log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
+ }
}
}
}
@@ -125,6 +130,9 @@ public class RecordCollectorImpl implements RecordCollector {
private void checkForException() {
if (sendException != null) {
+ if (sendException instanceof ProducerFencedException) {
+ throw (ProducerFencedException) sendException;
+ }
throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 23ce958..39369c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -259,7 +259,7 @@ public class StreamThread extends Thread {
} catch (final LockException e) {
// ignore and retry
if (!retryingTasks.contains(taskId)) {
- log.warn("{} Could not create task {}. Will retry. {}", logPrefix, taskId, e);
+ log.warn("{} Could not create task {} due to {}. Will retry.", logPrefix, taskId, e.getMessage());
retryingTasks.add(taskId);
}
}
@@ -1427,7 +1427,10 @@ public class StreamThread extends Thread {
try {
task.close(false);
} catch (final Exception f) {
- log.warn("{} Failed to close zombie task: ", logPrefix, f);
+ if (!log.isDebugEnabled() && !log.isTraceEnabled()) {
+ log.warn("{} Failed to close zombie task: {}", logPrefix, f.getMessage());
+ }
+ log.debug("{} Failed to close zombie task: ", logPrefix, f);
}
activeTasks.remove(task.id);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/160933bc/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 451d5e3..c7f4ff4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -48,6 +47,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,20 +67,30 @@ import static org.junit.Assert.fail;
@Category({IntegrationTest.class})
public class EosIntegrationTest {
private static final int NUM_BROKERS = 3;
+ private static final int MAX_POLL_INTERVAL_MS = 5 * 1000;
+ private static final int MAX_WAIT_TIME_MS = 60 * 1000;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
+ {
+ put("auto.create.topics.enable", false);
+ }
+ });
private static String applicationId;
private final static String CONSUMER_GROUP_ID = "readCommitted";
private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
- private final static String MULTI_PARTITION_INPUT_TOPIC = "inputTopic";
+ private final static String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic";
+ private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic";
private final static int NUM_TOPIC_PARTITIONS = 2;
- private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "outputTopic";
+ private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
+ private final static String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic";
private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
private final String storeName = "store";
private AtomicBoolean errorInjected;
+ private AtomicBoolean injectGC;
+ private volatile boolean doGC = true;
private AtomicInteger commitRequested;
private Throwable uncaughtException;
@@ -89,45 +99,58 @@ public class EosIntegrationTest {
@Before
public void createTopics() throws Exception {
applicationId = "appId-" + ++testNumber;
- CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+ CLUSTER.deleteTopicsAndWait(
+ SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
+ SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
+ SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
- CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+ CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+ CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
}
@Test
public void shouldBeAbleToRunWithEosEnabled() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
}
@Test
public void shouldBeAbleToRestartAfterClose() throws Exception {
- runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
}
@Test
public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
- runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, null, MULTI_PARTITION_OUTPUT_TOPIC);
}
@Test
public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
- runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, null, SINGLE_PARTITION_OUTPUT_TOPIC);
+ }
+
+ @Test
+ public void shouldBeAbleToRunWithTwoSubtopologies() throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_THROUGH_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ }
+
+ @Test
+ public void shouldBeAbleToRunWithTwoSubtopologiesAndMultiplePartitions() throws Exception {
+ runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, MULTI_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
}
private void runSimpleCopyTest(final int numberOfRestarts,
final String inputTopic,
+ final String throughTopic,
final String outputTopic) throws Exception {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Long, Long> input = builder.stream(inputTopic);
- input
- .mapValues(new ValueMapper<Long, Long>() {
- @Override
- public Long apply(final Long value) {
- return value;
- }
- })
- .to(outputTopic);
+ KStream<Long, Long> output = input;
+ if (throughTopic != null) {
+ output = input.through(throughTopic);
+ }
+ output.to(outputTopic);
for (int i = 0; i < numberOfRestarts; ++i) {
final long factor = i;
@@ -140,6 +163,7 @@ public class EosIntegrationTest {
Serdes.LongSerde.class.getName(),
new Properties() {
{
+ put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
}
}));
@@ -147,16 +171,7 @@ public class EosIntegrationTest {
try {
streams.start();
- final List<KeyValue<Long, Long>> inputData = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, factor * 100));
- add(new KeyValue<>(0L, factor * 100 + 1L));
- add(new KeyValue<>(0L, factor * 100 + 2L));
- add(new KeyValue<>(1L, factor * 100));
- add(new KeyValue<>(1L, factor * 100 + 1L));
- add(new KeyValue<>(1L, factor * 100 + 2L));
- }
- };
+ final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
@@ -239,22 +254,8 @@ public class EosIntegrationTest {
try {
streams.start();
- final List<KeyValue<Long, Long>> firstBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 0L));
- add(new KeyValue<>(0L, 1L));
- add(new KeyValue<>(0L, 2L));
- add(new KeyValue<>(0L, 3L));
- add(new KeyValue<>(0L, 4L));
- }
- };
- final List<KeyValue<Long, Long>> secondBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 5L));
- add(new KeyValue<>(0L, 6L));
- add(new KeyValue<>(0L, 7L));
- }
- };
+ final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
+ final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);
IntegrationTestUtils.produceKeyValuesSynchronously(
SINGLE_PARTITION_INPUT_TOPIC,
@@ -320,67 +321,18 @@ public class EosIntegrationTest {
// -> the failure only kills one thread
// after fail over, we should read 40 committed records (even if 50 record got written)
- final KafkaStreams streams = getKafkaStreams(false);
+ final KafkaStreams streams = getKafkaStreams(false, "appDir", 2);
try {
streams.start();
- final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 0L));
- add(new KeyValue<>(0L, 1L));
- add(new KeyValue<>(0L, 2L));
- add(new KeyValue<>(0L, 3L));
- add(new KeyValue<>(0L, 4L));
- add(new KeyValue<>(0L, 5L));
- add(new KeyValue<>(0L, 6L));
- add(new KeyValue<>(0L, 7L));
- add(new KeyValue<>(0L, 8L));
- add(new KeyValue<>(0L, 9L));
- add(new KeyValue<>(1L, 0L));
- add(new KeyValue<>(1L, 1L));
- add(new KeyValue<>(1L, 2L));
- add(new KeyValue<>(1L, 3L));
- add(new KeyValue<>(1L, 4L));
- add(new KeyValue<>(1L, 5L));
- add(new KeyValue<>(1L, 6L));
- add(new KeyValue<>(1L, 7L));
- add(new KeyValue<>(1L, 8L));
- add(new KeyValue<>(1L, 9L));
- }
- };
- final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 10L));
- add(new KeyValue<>(0L, 11L));
- add(new KeyValue<>(0L, 12L));
- add(new KeyValue<>(0L, 13L));
- add(new KeyValue<>(0L, 14L));
- add(new KeyValue<>(1L, 10L));
- add(new KeyValue<>(1L, 11L));
- add(new KeyValue<>(1L, 12L));
- add(new KeyValue<>(1L, 13L));
- add(new KeyValue<>(1L, 14L));
- }
- };
+ final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
dataBeforeFailure.addAll(committedDataBeforeFailure);
dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
- final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 15L));
- add(new KeyValue<>(0L, 16L));
- add(new KeyValue<>(0L, 17L));
- add(new KeyValue<>(0L, 18L));
- add(new KeyValue<>(0L, 19L));
- add(new KeyValue<>(1L, 15L));
- add(new KeyValue<>(1L, 16L));
- add(new KeyValue<>(1L, 17L));
- add(new KeyValue<>(1L, 18L));
- add(new KeyValue<>(1L, 19L));
- }
- };
+ final List<KeyValue<Long, Long>> dataAfterFailure = prepareData(15L, 20L, 0L, 1L);
writeInputData(committedDataBeforeFailure);
@@ -389,7 +341,7 @@ public class EosIntegrationTest {
public boolean conditionMet() {
return commitRequested.get() == 2;
}
- }, 60000, "SteamsTasks did not request commit.");
+ }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
writeInputData(uncommittedDataBeforeFailure);
@@ -407,7 +359,7 @@ public class EosIntegrationTest {
public boolean conditionMet() {
return uncaughtException != null;
}
- }, 60000, "Should receive uncaught exception from one StreamThread.");
+ }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -433,9 +385,199 @@ public class EosIntegrationTest {
}
}
- private KafkaStreams getKafkaStreams(final boolean withState) {
+ @Test
+ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+ // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
+ // the app is supposed to emit all 40 update records into the output topic
+ // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
+ //
+ // the failure gets inject after 20 committed and 30 uncommitted records got received
+ // -> the failure only kills one thread
+ // after fail over, we should read 40 committed records and the state stores should contain the correct sums
+ // per key (even if some records got processed twice)
+
+ final KafkaStreams streams = getKafkaStreams(true, "appDir", 2);
+ try {
+ streams.start();
+
+ final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L);
+
+ final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+ dataBeforeFailure.addAll(committedDataBeforeFailure);
+ dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> dataAfterFailure = prepareData(15L, 20L, 0L, 1L);
+
+ writeInputData(committedDataBeforeFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return commitRequested.get() == 2;
+ }
+ }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+
+ writeInputData(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+ final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
+ checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
+ checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
+ verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
+
+ errorInjected.set(true);
+ writeInputData(dataAfterFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return uncaughtException != null;
+ }
+ }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
+
+ final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+ committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID + "_ALL");
+
+ final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
+ uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
+
+ final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
+
+ checkResultPerKey(allCommittedRecords, expectedResult);
+ checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+
+ verifyStateStore(streams, getMaxPerKey(expectedResult));
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Test
+ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
+ // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
+ // the app is supposed to copy all 60 records into the output topic
+ // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ //
+ // a GC pause gets inject after 20 committed and 30 uncommitted records got received
+ // -> the GC pause only affects one thread and should trigger a rebalance
+ // after rebalancing, we should read 40 committed records (even if 50 record got written)
+ //
+ // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
+ // we write the remaining 20 records and verify to read 60 result records
+
+ final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
+ final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1);
+ try {
+ streams1.start();
+ streams2.start();
+
+ final List<KeyValue<Long, Long>> committedDataBeforeGC = prepareData(0L, 10L, 0L, 1L);
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeGC = prepareData(10L, 15L, 0L, 1L);
+
+ final List<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<>();
+ dataBeforeGC.addAll(committedDataBeforeGC);
+ dataBeforeGC.addAll(uncommittedDataBeforeGC);
+
+ final List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = prepareData(15L, 20L, 0L, 1L);
+
+ final List<KeyValue<Long, Long>> dataAfterSecondRebalance = prepareData(20L, 30L, 0L, 1L);
+
+ writeInputData(committedDataBeforeGC);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return commitRequested.get() == 2;
+ }
+ }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+
+ writeInputData(uncommittedDataBeforeGC);
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeGC.size(), null);
+ final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
+
+ checkResultPerKey(committedRecords, committedDataBeforeGC);
+ checkResultPerKey(uncommittedRecords, dataBeforeGC);
+
+ injectGC.set(true);
+ writeInputData(dataToTriggerFirstRebalance);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 &&
+ (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
+ || streams2.allMetadata().iterator().next().topicPartitions().size() == 2);
+ }
+ }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+
+ final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
+ uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(),
+ CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
+ expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeGC);
+ expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
+
+ checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
+
+ doGC = false;
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1
+ && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
+ && streams2.allMetadata().iterator().next().topicPartitions().size() == 1;
+ }
+ }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+
+ writeInputData(dataAfterSecondRebalance);
+
+ final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+ committedDataBeforeGC.size() + uncommittedDataBeforeGC.size()
+ + dataToTriggerFirstRebalance.size() + dataAfterSecondRebalance.size(),
+ CONSUMER_GROUP_ID + "_ALL");
+
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeGC);
+ allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeGC);
+ allExpectedCommittedRecordsAfterRecovery.addAll(dataToTriggerFirstRebalance);
+ allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
+
+ checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
+ } finally {
+ streams1.close();
+ streams2.close();
+ }
+ }
+
+ private List<KeyValue<Long, Long>> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) {
+ final List<KeyValue<Long, Long>> data = new ArrayList<>();
+
+ for (final Long k : keys) {
+ for (long v = fromInclusive; v < toExclusive; ++v) {
+ data.add(new KeyValue<>(k, v));
+ }
+ }
+
+ return data;
+ }
+
+ private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) {
commitRequested = new AtomicInteger(0);
errorInjected = new AtomicBoolean(false);
+ injectGC = new AtomicBoolean(false);
final KStreamBuilder builder = new KStreamBuilder();
String[] storeNames = null;
@@ -449,6 +591,7 @@ public class EosIntegrationTest {
builder.addStateStore(storeSupplier);
}
+
final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() {
@SuppressWarnings("unchecked")
@@ -473,6 +616,15 @@ public class EosIntegrationTest {
// only tries to fail once on one of the task
throw new RuntimeException("Injected test exception.");
}
+ if (injectGC.compareAndSet(true, false)) {
+ while (doGC) {
+ try {
+ Thread.sleep(100);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
if ((value + 1) % 10 == 0) {
context.commit();
@@ -514,11 +666,14 @@ public class EosIntegrationTest {
new Properties() {
{
put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
- put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1);
put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+ put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+ put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142");
}
}));
@@ -572,133 +727,6 @@ public class EosIntegrationTest {
);
}
- @Test
- public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
- // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
- // the app is supposed to emit all 40 update records into the output topic
- // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
- // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
- //
- // the failure gets inject after 20 committed and 30 uncommitted records got received
- // -> the failure only kills one thread
- // after fail over, we should read 40 committed records and the state stores should contain the correct sums
- // per key (even if some records got processed twice)
-
- final KafkaStreams streams = getKafkaStreams(true);
- try {
- streams.start();
-
- final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 0L));
- add(new KeyValue<>(0L, 1L));
- add(new KeyValue<>(0L, 2L));
- add(new KeyValue<>(0L, 3L));
- add(new KeyValue<>(0L, 4L));
- add(new KeyValue<>(0L, 5L));
- add(new KeyValue<>(0L, 6L));
- add(new KeyValue<>(0L, 7L));
- add(new KeyValue<>(0L, 8L));
- add(new KeyValue<>(0L, 9L));
- add(new KeyValue<>(1L, 0L));
- add(new KeyValue<>(1L, 1L));
- add(new KeyValue<>(1L, 2L));
- add(new KeyValue<>(1L, 3L));
- add(new KeyValue<>(1L, 4L));
- add(new KeyValue<>(1L, 5L));
- add(new KeyValue<>(1L, 6L));
- add(new KeyValue<>(1L, 7L));
- add(new KeyValue<>(1L, 8L));
- add(new KeyValue<>(1L, 9L));
- }
- };
- final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 10L));
- add(new KeyValue<>(0L, 11L));
- add(new KeyValue<>(0L, 12L));
- add(new KeyValue<>(0L, 13L));
- add(new KeyValue<>(0L, 14L));
- add(new KeyValue<>(1L, 10L));
- add(new KeyValue<>(1L, 11L));
- add(new KeyValue<>(1L, 12L));
- add(new KeyValue<>(1L, 13L));
- add(new KeyValue<>(1L, 14L));
- }
- };
-
- final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
- dataBeforeFailure.addAll(committedDataBeforeFailure);
- dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
-
- final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
- {
- add(new KeyValue<>(0L, 15L));
- add(new KeyValue<>(0L, 16L));
- add(new KeyValue<>(0L, 17L));
- add(new KeyValue<>(0L, 18L));
- add(new KeyValue<>(0L, 19L));
- add(new KeyValue<>(1L, 15L));
- add(new KeyValue<>(1L, 16L));
- add(new KeyValue<>(1L, 17L));
- add(new KeyValue<>(1L, 18L));
- add(new KeyValue<>(1L, 19L));
- }
- };
-
- writeInputData(committedDataBeforeFailure);
-
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return commitRequested.get() == 2;
- }
- }, 60000, "SteamsTasks did not request commit.");
-
- writeInputData(uncommittedDataBeforeFailure);
-
- final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
- final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
-
- final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
- checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
- checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
- verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
-
- errorInjected.set(true);
- writeInputData(dataAfterFailure);
-
- TestUtils.waitForCondition(new TestCondition() {
- @Override
- public boolean conditionMet() {
- return uncaughtException != null;
- }
- }, 60000, "Should receive uncaught exception from one StreamThread.");
-
- final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
- committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
- CONSUMER_GROUP_ID + "_ALL");
-
- final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
- uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
- CONSUMER_GROUP_ID);
-
- final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
- allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
- allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
- allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
-
- final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
-
- checkResultPerKey(allCommittedRecords, expectedResult);
- checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
-
- verifyStateStore(streams, getMaxPerKey(expectedResult));
- } finally {
- streams.close();
- }
- }
-
private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());