You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/01 20:16:45 UTC
kafka git commit: KAFKA-5361: Add EOS integration tests for Streams
API
Repository: kafka
Updated Branches:
refs/heads/trunk 1959835d9 -> 39c1e6259
KAFKA-5361: Add EOS integration tests for Streams API
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3193 from mjsax/kafka-5361-add-eos-integration-tests-for-streams-api
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39c1e625
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39c1e625
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39c1e625
Branch: refs/heads/trunk
Commit: 39c1e6259c8ff547c3f7c5165b43c88f9f3ea158
Parents: 1959835
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 1 13:16:42 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 1 13:16:42 2017 -0700
----------------------------------------------------------------------
.../main/scala/kafka/serializer/Decoder.scala | 18 +
.../streams/processor/internals/StreamTask.java | 13 +-
.../processor/internals/StreamThread.java | 2 +-
.../streams/integration/EosIntegrationTest.java | 782 +++++++++++++++++++
.../processor/internals/StreamTaskTest.java | 11 +
5 files changed, 823 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c1e625/core/src/main/scala/kafka/serializer/Decoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 164c3fa..5dfad26 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -17,6 +17,8 @@
package kafka.serializer
+import java.nio.ByteBuffer
+
import kafka.utils.VerifiableProperties
/**
@@ -50,3 +52,19 @@ class StringDecoder(props: VerifiableProperties = null) extends Decoder[String]
new String(bytes, encoding)
}
}
+
+/**
+ * The string decoder translates bytes into strings. It uses UTF8 by default but takes
+ * an optional property serializer.encoding to control this.
+ */
+class LongDecoder(props: VerifiableProperties = null) extends Decoder[Long] {
+ val encoding =
+ if(props == null)
+ "UTF8"
+ else
+ props.getString("serializer.encoding", "UTF8")
+
+ def fromBytes(bytes: Array[Byte]): Long = {
+ ByteBuffer.wrap(bytes).getLong
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c1e625/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 731030d..09f734b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -63,6 +63,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
private boolean commitRequested = false;
private boolean commitOffsetNeeded = false;
+ private boolean transactionInFlight = false;
private final Time time;
private final TaskMetrics metrics;
@@ -139,8 +140,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
initializeStateStores();
stateMgr.registerGlobalStateStores(topology.globalStateStores());
if (eosEnabled) {
- producer.initTransactions();
- producer.beginTransaction();
+ this.producer.initTransactions();
+ this.producer.beginTransaction();
+ transactionInFlight = true;
}
initTopology();
processorContext.initialized();
@@ -157,6 +159,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
log.debug("{} Resuming", logPrefix);
if (eosEnabled) {
producer.beginTransaction();
+ transactionInFlight = true;
}
initTopology();
}
@@ -294,7 +297,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
producer.commitTransaction();
+ transactionInFlight = false;
if (startNewTransaction) {
+ transactionInFlight = true;
producer.beginTransaction();
}
} else {
@@ -306,6 +311,9 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
}
commitOffsetNeeded = false;
+ } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+ producer.commitTransaction();
+ transactionInFlight = false;
}
commitRequested = false;
@@ -424,6 +432,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
if (!clean) {
try {
producer.abortTransaction();
+ transactionInFlight = false;
} catch (final ProducerFencedException e) {
// can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c1e625/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a453e49..44cd1b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -747,7 +747,7 @@ public class StreamThread extends Thread {
// note that once we set recordsProcessedBeforeCommit, it will never be UNLIMITED_RECORDS again, so
// we will never process all records again. This might be an issue if the initial measurement
// was off due to a slow start.
- if (processLatency > commitTime) {
+ if (processLatency > 0 && processLatency > commitTime) {
// push down
recordsProcessedBeforeCommit = Math.max(1, (commitTime * totalProcessed) / processLatency);
log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}",
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c1e625/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
new file mode 100644
index 0000000..0f5b3dd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({IntegrationTest.class})
+public class EosIntegrationTest {
+ private static final int NUM_BROKERS = 3;
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private static String applicationId;
+ private final static String CONSUMER_GROUP_ID = "readCommitted";
+ private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic";
+ private final static String MULTI_PARTITION_INPUT_TOPIC = "inputTopic";
+ private final static int NUM_TOPIC_PARTITIONS = 2;
+ private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "outputTopic";
+ private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
+ private final String storeName = "store";
+
+ private final Map<Integer, Integer> maxPartitionNumberSeen = Collections.synchronizedMap(new HashMap<Integer, Integer>());
+ private boolean injectError = false;
+ private AtomicInteger commitRequested;
+ private Throwable uncaughtException;
+
+ private int testNumber = 0;
+
+ @Before
+ public void createTopics() throws Exception {
+ applicationId = "appId-" + ++testNumber;
+ CLUSTER.deleteTopicsAndWait(SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+
+ CLUSTER.createTopics(SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
+ }
+
+ @Test
+ public void shouldBeAbleToRunWithEosEnabled() throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ }
+
+ @Test
+ public void shouldBeAbleToRestartAfterClose() throws Exception {
+ runSimpleCopyTest(2, SINGLE_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ }
+
+ @Test
+ public void shouldBeAbleToCommitToMultiplePartitions() throws Exception {
+ runSimpleCopyTest(1, SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
+ }
+
+ @Test
+ public void shouldBeAbleToCommitMultiplePartitionOffsets() throws Exception {
+ runSimpleCopyTest(1, MULTI_PARTITION_INPUT_TOPIC, SINGLE_PARTITION_OUTPUT_TOPIC);
+ }
+
+ private void runSimpleCopyTest(final int numberOfRestarts,
+ final String inputTopic,
+ final String outputTopic) throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<Long, Long> input = builder.stream(inputTopic);
+ input
+ .mapValues(new ValueMapper<Long, Long>() {
+ @Override
+ public Long apply(final Long value) {
+ return value;
+ }
+ })
+ .to(outputTopic);
+
+ for (int i = 0; i < numberOfRestarts; ++i) {
+ final long factor = i;
+ final KafkaStreams streams = new KafkaStreams(
+ builder,
+ StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ new Properties() {
+ {
+ put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
+ }));
+
+ try {
+ streams.start();
+
+ final List<KeyValue<Long, Long>> inputData = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, factor * 100));
+ add(new KeyValue<>(0L, factor * 100 + 1L));
+ add(new KeyValue<>(0L, factor * 100 + 2L));
+ add(new KeyValue<>(1L, factor * 100));
+ add(new KeyValue<>(1L, factor * 100 + 1L));
+ add(new KeyValue<>(1L, factor * 100 + 2L));
+ }
+ };
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ inputTopic,
+ inputData,
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
+ CLUSTER.time
+ );
+
+ final List<KeyValue<Long, Long>> committedRecords
+ = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ inputTopic,
+ inputData.size()
+ );
+
+ checkResultPerKey(committedRecords, inputData);
+ } finally {
+ streams.close();
+ }
+ }
+ }
+
+ private void checkResultPerKey(final List<KeyValue<Long, Long>> result,
+ final List<KeyValue<Long, Long>> expectedResult) {
+ final Set<Long> allKeys = new HashSet<>();
+ addAllKeys(allKeys, result);
+ addAllKeys(allKeys, expectedResult);
+
+ for (final Long key : allKeys) {
+ assertThat(getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
+ }
+
+ }
+
+ private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long, Long>> records) {
+ for (final KeyValue<Long, Long> record : records) {
+ allKeys.add(record.key);
+ }
+ }
+
+ private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key, final List<KeyValue<Long, Long>> records) {
+ final List<KeyValue<Long, Long>> recordsPerKey = new ArrayList<>(records.size());
+
+ for (final KeyValue<Long, Long> record : records) {
+ if (record.key.equals(key)) {
+ recordsPerKey.add(record);
+ }
+ }
+
+ return recordsPerKey;
+ }
+
+ @Test
+ public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
+
+ final KafkaStreams streams = new KafkaStreams(
+ builder,
+ StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ new Properties() {
+ {
+ put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
+ }));
+
+ try {
+ streams.start();
+
+ final List<KeyValue<Long, Long>> firstBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 0L));
+ add(new KeyValue<>(0L, 1L));
+ add(new KeyValue<>(0L, 2L));
+ add(new KeyValue<>(0L, 3L));
+ add(new KeyValue<>(0L, 4L));
+ }
+ };
+ final List<KeyValue<Long, Long>> secondBurstOfData = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 5L));
+ add(new KeyValue<>(0L, 6L));
+ add(new KeyValue<>(0L, 7L));
+ }
+ };
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ SINGLE_PARTITION_INPUT_TOPIC,
+ firstBurstOfData,
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
+ CLUSTER.time
+ );
+
+ final List<KeyValue<Long, Long>> firstCommittedRecords
+ = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ firstBurstOfData.size()
+ );
+
+ assertThat(firstCommittedRecords, equalTo(firstBurstOfData));
+
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ SINGLE_PARTITION_INPUT_TOPIC,
+ secondBurstOfData,
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
+ CLUSTER.time
+ );
+
+ final List<KeyValue<Long, Long>> secondCommittedRecords
+ = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ CONSUMER_GROUP_ID,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ secondBurstOfData.size()
+ );
+
+ assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void shouldNotViolateEosIfOneTaskFails() throws Exception {
+ // this test writes 10 + 5 + 5 records per partition (running with 2 parttions)
+ // the app is supposed to copy all 40 records into the output topic
+ // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ //
+ // the failure gets inject after 20 committed and 30 uncommitted records got received
+ // -> the failure only kills one thread
+ // after fail over, we should read 40 committed records (even if 50 record got written)
+
+ final KafkaStreams streams = getKafkaStreams(false);
+ try {
+ streams.start();
+
+ final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 0L));
+ add(new KeyValue<>(0L, 1L));
+ add(new KeyValue<>(0L, 2L));
+ add(new KeyValue<>(0L, 3L));
+ add(new KeyValue<>(0L, 4L));
+ add(new KeyValue<>(0L, 5L));
+ add(new KeyValue<>(0L, 6L));
+ add(new KeyValue<>(0L, 7L));
+ add(new KeyValue<>(0L, 8L));
+ add(new KeyValue<>(0L, 9L));
+ add(new KeyValue<>(1L, 0L));
+ add(new KeyValue<>(1L, 1L));
+ add(new KeyValue<>(1L, 2L));
+ add(new KeyValue<>(1L, 3L));
+ add(new KeyValue<>(1L, 4L));
+ add(new KeyValue<>(1L, 5L));
+ add(new KeyValue<>(1L, 6L));
+ add(new KeyValue<>(1L, 7L));
+ add(new KeyValue<>(1L, 8L));
+ add(new KeyValue<>(1L, 9L));
+ }
+ };
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 10L));
+ add(new KeyValue<>(0L, 11L));
+ add(new KeyValue<>(0L, 12L));
+ add(new KeyValue<>(0L, 13L));
+ add(new KeyValue<>(0L, 14L));
+ add(new KeyValue<>(1L, 10L));
+ add(new KeyValue<>(1L, 11L));
+ add(new KeyValue<>(1L, 12L));
+ add(new KeyValue<>(1L, 13L));
+ add(new KeyValue<>(1L, 14L));
+ }
+ };
+
+ final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+ dataBeforeFailure.addAll(committedDataBeforeFailure);
+ dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 15L));
+ add(new KeyValue<>(0L, 16L));
+ add(new KeyValue<>(0L, 17L));
+ add(new KeyValue<>(0L, 18L));
+ add(new KeyValue<>(0L, 19L));
+ add(new KeyValue<>(1L, 15L));
+ add(new KeyValue<>(1L, 16L));
+ add(new KeyValue<>(1L, 17L));
+ add(new KeyValue<>(1L, 18L));
+ add(new KeyValue<>(1L, 19L));
+ }
+ };
+
+ writeInputData(committedDataBeforeFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return commitRequested.get() == 2;
+ }
+ }, 60000, "SteamsTasks did not request commit.");
+
+ writeInputData(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+ final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+
+ checkResultPerKey(committedRecords, committedDataBeforeFailure);
+ checkResultPerKey(uncommittedRecords, dataBeforeFailure);
+
+ injectError = true;
+ writeInputData(dataAfterFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return uncaughtException != null;
+ }
+ }, 60000, "Should receive uncaught exception from one StreamThread.");
+
+ final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+ committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID + "_ALL");
+
+ final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
+ uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
+
+ final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
+ expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
+
+ checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
+ checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
+ } finally {
+ streams.close();
+ }
+ }
+
+ private KafkaStreams getKafkaStreams(final boolean withState) {
+ commitRequested = new AtomicInteger(0);
+ final KStreamBuilder builder = new KStreamBuilder();
+
+ String[] storeNames = null;
+ if (withState) {
+ storeNames = new String[] {storeName};
+ final StateStoreSupplier storeSupplier = Stores.create(storeName)
+ .withLongKeys()
+ .withLongValues()
+ .persistent()
+ .build();
+
+ builder.addStateStore(storeSupplier);
+ }
+ final KStream<Long, Long> input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
+ input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, Long>>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Transformer<Long, Long, KeyValue<Long, Long>> get() {
+ return new Transformer<Long, Long, KeyValue<Long, Long>>() {
+ ProcessorContext context;
+ int processedRecords = 0;
+ KeyValueStore<Long, Long> state = null;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ final Integer hashCode = hashCode();
+ if (!maxPartitionNumberSeen.containsKey(hashCode)) {
+ if (maxPartitionNumberSeen.size() < 2) {
+ // initial startup case
+ maxPartitionNumberSeen.put(hashCode, -1);
+ } else {
+ // recovery case -- we need to "protect" the new instance of Transformer
+ // to throw the injected exception again
+ maxPartitionNumberSeen.put(hashCode, Integer.MAX_VALUE);
+ }
+ }
+ this.context = context;
+
+ if (withState) {
+ state = (KeyValueStore<Long, Long>) context.getStateStore(storeName);
+ }
+ }
+
+ @Override
+ public KeyValue<Long, Long> transform(final Long key, final Long value) {
+ final Integer hashCode = hashCode();
+ int maxPartitionNumber = maxPartitionNumberSeen.get(hashCode);
+ maxPartitionNumber = Math.max(maxPartitionNumber, context.partition());
+ maxPartitionNumberSeen.put(hashCode, maxPartitionNumber);
+ if (maxPartitionNumber == 0 && injectError) {
+ throw new RuntimeException("Injected test exception.");
+ }
+
+ if (++processedRecords % 10 == 0) {
+ context.commit();
+ commitRequested.incrementAndGet();
+ }
+
+ if (state != null) {
+ Long sum = state.get(key);
+ if (sum == null) {
+ sum = value;
+ } else {
+ sum += value;
+ }
+ state.put(key, sum);
+ context.forward(key, sum);
+ return null;
+ }
+ return new KeyValue<>(key, value);
+ }
+
+ @Override
+ public KeyValue<Long, Long> punctuate(final long timestamp) {
+ return null;
+ }
+
+ @Override
+ public void close() { }
+ };
+ } }, storeNames)
+ .to(SINGLE_PARTITION_OUTPUT_TOPIC);
+
+ final KafkaStreams streams = new KafkaStreams(
+ builder,
+ StreamsTestUtils.getStreamsConfig(
+ applicationId,
+ CLUSTER.bootstrapServers(),
+ Serdes.LongSerde.class.getName(),
+ Serdes.LongSerde.class.getName(),
+ new Properties() {
+ {
+ put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, -1);
+ put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
+ put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+ put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ }
+ }));
+
+ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ if (uncaughtException != null) {
+ e.printStackTrace(System.err);
+ fail("Should only get one uncaught exception from Streams.");
+ }
+ uncaughtException = e;
+ }
+ });
+
+ return streams;
+ }
+
+ private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ MULTI_PARTITION_INPUT_TOPIC,
+ records,
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
+ CLUSTER.time
+ );
+ }
+
+ private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
+ final String groupId) throws Exception {
+ if (groupId != null) {
+ return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ groupId,
+ LongDeserializer.class,
+ LongDeserializer.class,
+ new Properties() {
+ {
+ put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
+ }
+ }),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ numberOfRecords
+ );
+ }
+
+ // read uncommitted
+ return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
+ SINGLE_PARTITION_OUTPUT_TOPIC,
+ numberOfRecords
+ );
+ }
+
+ @Ignore
+ @Test
+ public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+ // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
+ // the app is supposed to emit all 40 update records into the output topic
+ // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
+ // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
+ //
+ // the failure gets inject after 20 committed and 30 uncommitted records got received
+ // -> the failure only kills one thread
+ // after fail over, we should read 40 committed records and the state stores should contain the correct sums
+ // per key (even if some recrods got processed twice)
+
+ final KafkaStreams streams = getKafkaStreams(true);
+ try {
+ streams.start();
+
+ final List<KeyValue<Long, Long>> committedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 0L));
+ add(new KeyValue<>(0L, 1L));
+ add(new KeyValue<>(0L, 2L));
+ add(new KeyValue<>(0L, 3L));
+ add(new KeyValue<>(0L, 4L));
+ add(new KeyValue<>(0L, 5L));
+ add(new KeyValue<>(0L, 6L));
+ add(new KeyValue<>(0L, 7L));
+ add(new KeyValue<>(0L, 8L));
+ add(new KeyValue<>(0L, 9L));
+ add(new KeyValue<>(1L, 0L));
+ add(new KeyValue<>(1L, 1L));
+ add(new KeyValue<>(1L, 2L));
+ add(new KeyValue<>(1L, 3L));
+ add(new KeyValue<>(1L, 4L));
+ add(new KeyValue<>(1L, 5L));
+ add(new KeyValue<>(1L, 6L));
+ add(new KeyValue<>(1L, 7L));
+ add(new KeyValue<>(1L, 8L));
+ add(new KeyValue<>(1L, 9L));
+ }
+ };
+ final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 10L));
+ add(new KeyValue<>(0L, 11L));
+ add(new KeyValue<>(0L, 12L));
+ add(new KeyValue<>(0L, 13L));
+ add(new KeyValue<>(0L, 14L));
+ add(new KeyValue<>(1L, 10L));
+ add(new KeyValue<>(1L, 11L));
+ add(new KeyValue<>(1L, 12L));
+ add(new KeyValue<>(1L, 13L));
+ add(new KeyValue<>(1L, 14L));
+ }
+ };
+
+ final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
+ dataBeforeFailure.addAll(committedDataBeforeFailure);
+ dataBeforeFailure.addAll(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> dataAfterFailure = new ArrayList<KeyValue<Long, Long>>() {
+ {
+ add(new KeyValue<>(0L, 15L));
+ add(new KeyValue<>(0L, 16L));
+ add(new KeyValue<>(0L, 17L));
+ add(new KeyValue<>(0L, 18L));
+ add(new KeyValue<>(0L, 19L));
+ add(new KeyValue<>(1L, 15L));
+ add(new KeyValue<>(1L, 16L));
+ add(new KeyValue<>(1L, 17L));
+ add(new KeyValue<>(1L, 18L));
+ add(new KeyValue<>(1L, 19L));
+ }
+ };
+
+ writeInputData(committedDataBeforeFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return commitRequested.get() == 2;
+ }
+ }, 60000, "SteamsTasks did not request commit.");
+
+ writeInputData(uncommittedDataBeforeFailure);
+
+ final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null);
+ final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> expectedResultBeforeFailure = computeExpectedResult(dataBeforeFailure);
+ checkResultPerKey(committedRecords, computeExpectedResult(committedDataBeforeFailure));
+ checkResultPerKey(uncommittedRecords, expectedResultBeforeFailure);
+ verifyStateStore(streams, getMaxPerKey(expectedResultBeforeFailure));
+
+ injectError = true;
+ writeInputData(dataAfterFailure);
+
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return uncaughtException != null;
+ }
+ }, 60000, "Should receive uncaught exception from one StreamThread.");
+
+ final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
+ committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID + "_ALL");
+
+ final List<KeyValue<Long, Long>> committedRecordsAfterFailure = readResult(
+ uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
+ CONSUMER_GROUP_ID);
+
+ final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+ allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
+ allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
+
+ final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
+
+ checkResultPerKey(allCommittedRecords, expectedResult);
+ checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+
+ verifyStateStore(streams, getMaxPerKey(expectedResult));
+ } finally {
+ streams.close();
+ }
+ }
+
+ private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
+ final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());
+
+ final HashMap<Long, Long> sums = new HashMap<>();
+
+ for (final KeyValue<Long, Long> record : input) {
+ Long sum = sums.get(record.key);
+ if (sum == null) {
+ sum = record.value;
+ } else {
+ sum += record.value;
+ }
+ sums.put(record.key, sum);
+ expectedResult.add(new KeyValue<>(record.key, sum));
+ }
+
+ return expectedResult;
+ }
+
+ private Set<KeyValue<Long, Long>> getMaxPerKey(final List<KeyValue<Long, Long>> input) {
+ final Set<KeyValue<Long, Long>> expectedResult = new HashSet<>(input.size());
+
+ final HashMap<Long, Long> maxPerKey = new HashMap<>();
+
+ for (final KeyValue<Long, Long> record : input) {
+ final Long max = maxPerKey.get(record.key);
+ if (max == null || record.value > max) {
+ maxPerKey.put(record.key, record.value);
+ }
+
+ }
+
+ for (final Map.Entry<Long, Long> max : maxPerKey.entrySet()) {
+ expectedResult.add(new KeyValue<>(max.getKey(), max.getValue()));
+ }
+
+ return expectedResult;
+ }
+
+ private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
+ ReadOnlyKeyValueStore<Long, Long> store = null;
+
+ final long maxWaitingTime = System.currentTimeMillis() + 300000L;
+ while (System.currentTimeMillis() < maxWaitingTime) {
+ try {
+ store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
+ break;
+ } catch (final InvalidStateStoreException okJustRetry) {
+ try {
+ Thread.sleep(5000L);
+ } catch (final Exception ignore) { }
+ }
+ }
+
+ final KeyValueIterator<Long, Long> it = store.all();
+ while (it.hasNext()) {
+ assertTrue(expectedStoreContent.remove(it.next()));
+ }
+
+ assertTrue(expectedStoreContent.isEmpty());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/39c1e625/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index b8c86f2..7d5b5ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -588,6 +588,17 @@ public class StreamTaskTest {
}
@Test
+ public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.suspend();
+ assertTrue(producer.transactionCommitted());
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,