You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/08 21:08:57 UTC
kafka git commit: KAFKA-5362: Add EOS system tests for Streams API
Repository: kafka
Updated Branches:
refs/heads/trunk 21194a63e -> ba07d828c
KAFKA-5362: Add EOS system tests for Streams API
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3201 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba07d828
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba07d828
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba07d828
Branch: refs/heads/trunk
Commit: ba07d828c545d542cc11db60249e660d88a20fbb
Parents: 21194a6
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 8 14:08:54 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 8 14:08:54 2017 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 4 +
.../processor/internals/StreamThread.java | 4 +-
.../kafka/streams/tests/EosTestClient.java | 150 ++++++
.../kafka/streams/tests/EosTestDriver.java | 470 +++++++++++++++++++
.../kafka/streams/tests/StreamsEosTest.java | 57 +++
tests/kafkatest/services/streams.py | 25 +
.../kafkatest/tests/streams/streams_eos_test.py | 105 +++++
7 files changed, 813 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 26f4a77..19f3a4d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -197,6 +197,10 @@
<allow pkg="org.I0Itec.zkclient" />
</subpackage>
+ <subpackage name="test">
+ <allow pkg="kafka.admin" />
+ </subpackage>
+
<subpackage name="state">
<allow pkg="org.rocksdb" />
</subpackage>
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 624a15e..23ce958 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -548,7 +548,7 @@ public class StreamThread extends Thread {
timerStartedMs = time.milliseconds();
// try to fetch some records if necessary
- final ConsumerRecords<byte[], byte[]> records = pollRequests(pollTimeMs);
+ final ConsumerRecords<byte[], byte[]> records = pollRequests();
if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
addRecordsToTasks(records);
@@ -573,7 +573,7 @@ public class StreamThread extends Thread {
* Get the next batch of records by polling.
* @return Next batch of records or null if no records available.
*/
- private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) {
+ private ConsumerRecords<byte[], byte[]> pollRequests() {
ConsumerRecords<byte[], byte[]> records = null;
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
new file mode 100644
index 0000000..fe5dc64
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class EosTestClient extends SmokeTestUtil {
+
+ static final String APP_ID = "EosTest";
+ private final String kafka;
+ private final File stateDir;
+ private KafkaStreams streams;
+ private boolean uncaughtException;
+
+ EosTestClient(final File stateDir, final String kafka) {
+ super();
+ this.stateDir = stateDir;
+ this.kafka = kafka;
+ }
+
+ private boolean isRunning = true;
+
+ public void start() {
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ isRunning = false;
+ streams.close(5, TimeUnit.SECONDS);
+ // do not remove these printouts since they are needed for health scripts
+ if (!uncaughtException) {
+ System.out.println("EOS-TEST-CLIENT-CLOSED");
+ }
+ }
+ }));
+
+ while (isRunning) {
+ if (streams == null) {
+ uncaughtException = false;
+
+ streams = createKafkaStreams(stateDir, kafka);
+ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ System.out.println("EOS-TEST-CLIENT-EXCEPTION");
+ e.printStackTrace();
+ uncaughtException = true;
+ }
+ });
+ streams.start();
+ }
+ if (uncaughtException) {
+ streams.close(5, TimeUnit.SECONDS);
+ streams = null;
+ }
+ sleep(1000);
+ }
+ }
+
+ private static KafkaStreams createKafkaStreams(final File stateDir,
+ final String kafka) {
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
+ props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
+ props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+
+
+ final KStreamBuilder builder = new KStreamBuilder();
+ final KStream<String, Integer> data = builder.stream("data");
+
+ data.to("echo");
+ data.process(SmokeTestUtil.printProcessorSupplier("data"));
+
+ final KGroupedStream<String, Integer> groupedData = data.groupByKey();
+ // min
+ groupedData
+ .aggregate(
+ new Initializer<Integer>() {
+ @Override
+ public Integer apply() {
+ return Integer.MAX_VALUE;
+ }
+ },
+ new Aggregator<String, Integer, Integer>() {
+ @Override
+ public Integer apply(final String aggKey,
+ final Integer value,
+ final Integer aggregate) {
+ return (value < aggregate) ? value : aggregate;
+ }
+ },
+ intSerde,
+ "min")
+ .to(stringSerde, intSerde, "min");
+
+ // sum
+ groupedData.aggregate(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<String, Integer, Long>() {
+ @Override
+ public Long apply(final String aggKey,
+ final Integer value,
+ final Long aggregate) {
+ return (long) value + aggregate;
+ }
+ },
+ longSerde,
+ "sum")
+ .to(stringSerde, longSerde, "sum");
+
+ return new KafkaStreams(builder, props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
new file mode 100644
index 0000000..07a0b1a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import kafka.admin.AdminClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+public class EosTestDriver extends SmokeTestUtil {
+
+ private static final int MAX_NUMBER_OF_KEYS = 100;
+ private static final long MAX_IDLE_TIME_MS = 300000L;
+
+ private static boolean isRunning = true;
+
+ static void generate(final String kafka) throws Exception {
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ isRunning = false;
+ }
+ });
+
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+ final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps);
+
+ final Random rand = new Random(System.currentTimeMillis());
+
+ int numRecordsProduced = 0;
+ while (isRunning) {
+ final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
+ final int value = rand.nextInt(10000);
+
+ final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
+
+ producer.send(record, new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+ if (exception != null) {
+ exception.printStackTrace();
+ Exit.exit(1);
+ }
+ }
+ });
+
+ numRecordsProduced++;
+ if (numRecordsProduced % 1000 == 0) {
+ System.out.println(numRecordsProduced + " records produced");
+ }
+ Utils.sleep(rand.nextInt(50));
+ }
+ producer.close();
+ System.out.println(numRecordsProduced + " records produced");
+ }
+
+ public static void verify(final String kafka) {
+ ensureStreamsApplicationDown(kafka);
+
+ final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);
+
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
+ = getOutputRecords(consumer, committedOffsets);
+
+ truncate("data", recordPerTopicPerPartition, committedOffsets);
+
+ verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
+ verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));
+
+ verifyAllTransactionFinished(consumer, kafka);
+
+ // do not modify: required test output
+ System.out.println("ALL-RECORDS-DELIVERED");
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ System.out.println("FAILED");
+ }
+ }
+
+ private static void ensureStreamsApplicationDown(final String kafka) {
+ AdminClient adminClient = null;
+ try {
+ adminClient = AdminClient.createSimplePlaintext(kafka);
+
+ final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
+ if (System.currentTimeMillis() > maxWaitTime) {
+ throw new RuntimeException("Streams application not down after 30 seconds.");
+ }
+ sleep(1000);
+ }
+ } finally {
+ if (adminClient != null) {
+ adminClient.close();
+ }
+ }
+ }
+
+ private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+
+ final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ final Set<String> topics = new HashSet<>();
+ topics.add("data");
+ consumer.subscribe(topics);
+ consumer.poll(0);
+
+ final Set<TopicPartition> partitions = new HashSet<>();
+ for (final String topic : topics) {
+ for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+ }
+ }
+
+ for (final TopicPartition tp : partitions) {
+ final long offset = consumer.position(tp);
+ committedOffsets.put(tp, offset);
+ }
+ }
+
+ return committedOffsets;
+ }
+
+ private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
+ final Map<TopicPartition, Long> committedOffsets) {
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
+
+ long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ boolean allRecordsReceived = false;
+ while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
+ final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
+
+ for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
+ maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ addRecord(record, recordPerTopicPerPartition);
+ }
+
+ if (receivedRecords.count() > 0) {
+ allRecordsReceived =
+ receivedAllRecords(
+ recordPerTopicPerPartition.get("data"),
+ recordPerTopicPerPartition.get("echo"),
+ committedOffsets);
+ }
+ }
+
+ if (!allRecordsReceived) {
+ throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
+ }
+
+ return recordPerTopicPerPartition;
+ }
+
+ private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition) {
+
+ final String topic = record.topic();
+ final TopicPartition partition = new TopicPartition(topic, record.partition());
+
+ if ("data".equals(topic)
+ || "echo".equals(topic)
+ || "min".equals(topic)
+ || "sum".equals(topic)) {
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
+ = recordPerTopicPerPartition.get(topic);
+
+ if (topicRecordsPerPartition == null) {
+ topicRecordsPerPartition = new HashMap<>();
+ recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
+ }
+
+ List<ConsumerRecord<byte[], byte[]>> records = topicRecordsPerPartition.get(partition);
+ if (records == null) {
+ records = new ArrayList<>();
+ topicRecordsPerPartition.put(partition, records);
+ }
+ records.add(record);
+ } else {
+ throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
+ }
+ }
+
+ private static boolean receivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> supersetExpectedRecords,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords,
+ final Map<TopicPartition, Long> committedOffsets) {
+ if (supersetExpectedRecords == null
+ || receivedRecords == null
+ || supersetExpectedRecords.keySet().size() < committedOffsets.keySet().size()
+ || receivedRecords.keySet().size() < committedOffsets.keySet().size()) {
+
+ return false;
+ }
+
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
+ final TopicPartition tp = partitionRecords.getKey();
+ final int numberOfReceivedRecords = partitionRecords.getValue().size();
+ final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
+ if (committed != null) {
+ if (numberOfReceivedRecords < committed) {
+ return false;
+ }
+ } else if (numberOfReceivedRecords > 0) {
+ throw new RuntimeException("Result verification failed for partition " + tp
+ + ". No offset was committed but we received " + numberOfReceivedRecords + " records.");
+ }
+ }
+
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
+ try {
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecords = supersetExpectedRecords.get(inputTopicPartition).iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+ final ConsumerRecord<byte[], byte[]> expected = expectedRecords.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
+ final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+
+ if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
+ }
+ }
+ } catch (final NullPointerException | NoSuchElementException e) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static void truncate(final String topic,
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
+ final Map<TopicPartition, Long> committedOffsets) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecords = recordPerTopicPerPartition.get(topic);
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> truncatedTopicRecords = recordPerTopicPerPartition.get(topic);
+
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) {
+ final TopicPartition tp = partitionRecords.getKey();
+ final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
+ truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committed != null ? committed.intValue() : 0));
+ }
+
+ recordPerTopicPerPartition.put(topic, truncatedTopicRecords);
+ }
+
+ private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+
+ final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+ try {
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+ final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
+
+
+ Integer min = currentMinPerKey.get(key);
+ if (min == null) {
+ min = value;
+ }
+ min = Math.min(min, value);
+ currentMinPerKey.put(key, min);
+
+ if (!receivedKey.equals(key) || receivedValue != min) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
+ }
+ }
+ } catch (final NullPointerException e) {
+ System.err.println(inputPerTopicPerPartition);
+ e.printStackTrace(System.err);
+ throw e;
+ }
+ }
+ }
+
+ private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+ final LongDeserializer longDeserializer = new LongDeserializer();
+
+ final HashMap<String, Long> currentSumPerKey = new HashMap<>();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+ try {
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+ final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
+
+ Long sum = currentSumPerKey.get(key);
+ if (sum == null) {
+ sum = 0L;
+ }
+ sum += value;
+ currentSumPerKey.put(key, sum);
+
+ if (!receivedKey.equals(key) || receivedValue != sum) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
+ }
+ }
+ } catch (final NullPointerException e) {
+ System.err.println(inputPerTopicPerPartition);
+ e.printStackTrace(System.err);
+ throw e;
+ }
+ }
+ }
+
+ private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
+ final String kafka) {
+ final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "min", "sum");
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+ consumer.poll(0);
+
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+
+ try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
+ for (final TopicPartition tp : partitions) {
+ final ProducerRecord<String, String> record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
+
+ producer.send(record, new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+ if (exception != null) {
+ exception.printStackTrace();
+ Exit.exit(1);
+ }
+ }
+ });
+ }
+ }
+
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+
+ final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+ while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
+ final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final String topic = record.topic();
+ final TopicPartition tp = new TopicPartition(topic, record.partition());
+
+ try {
+ final String key = stringDeserializer.deserialize(topic, record.key());
+ final String value = stringDeserializer.deserialize(topic, record.value());
+
+ if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) {
+ throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
+ "Expected record <'key','value'> from one of " + partitions + " but got"
+ + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
+ }
+ } catch (final SerializationException e) {
+ throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
+ "Expected record <'key','value'> from one of " + partitions + " but got " + record, e);
+ }
+
+ }
+ }
+ if (!partitions.isEmpty()) {
+ throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
+ }
+ }
+
+ private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer,
+ final String... topics) {
+ final ArrayList<TopicPartition> partitions = new ArrayList<>();
+
+ for (final String topic : topics) {
+ for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(info.topic(), info.partition()));
+ }
+ }
+ return partitions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
new file mode 100644
index 0000000..58499aa
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import java.io.File;
+
+public class StreamsEosTest {
+
+ /**
+ * args ::= command kafka zookeeper stateDir
+ * command := "run" | "process" | "verify"
+ */
+ public static void main(final String[] args) throws Exception {
+ final String kafka = args[0];
+ final String stateDir = args.length > 1 ? args[1] : null;
+ final String command = args.length > 2 ? args[2] : null;
+
+ System.out.println("StreamsTest instance started");
+ System.out.println("command=" + command);
+ System.out.println("kafka=" + kafka);
+ System.out.println("stateDir=" + stateDir);
+
+ if (command == null || stateDir == null) {
+ System.exit(-1);
+ }
+
+ switch (command) {
+ case "run":
+ EosTestDriver.generate(kafka);
+ break;
+ case "process":
+ final EosTestClient client = new EosTestClient(new File(stateDir), kafka);
+ client.start();
+ break;
+ case "verify":
+ EosTestDriver.verify(kafka);
+ break;
+ default:
+ System.out.println("unknown command: " + command);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 905320a..867e3f5 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -150,6 +150,16 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
command)
+class StreamsEosTestBaseService(StreamsTestBaseService):
+ """Base class for Streams EOS Test services providing some common settings and functionality"""
+
+ def __init__(self, test_context, kafka, command):
+ super(StreamsEosTestBaseService, self).__init__(test_context,
+ kafka,
+ "org.apache.kafka.streams.tests.StreamsEosTest",
+ command)
+
+
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
@@ -160,6 +170,21 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
+class StreamsEosTestDriverService(StreamsEosTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
+
+
+class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
+
+
+class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
+
+
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ba07d828/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
new file mode 100644
index 0000000..305cde0
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark.resource import cluster
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, StreamsEosTestVerifyRunnerService
+import time
+
+
+class StreamsEosTest(KafkaTest):
+ """
+ Test of Kafka Streams exactly-once semantics
+ """
+
+ def __init__(self, test_context):
+ super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
+ 'data' : { 'partitions': 5, 'replication-factor': 2 },
+ 'echo' : { 'partitions': 5, 'replication-factor': 2 },
+ 'min' : { 'partitions': 5, 'replication-factor': 2 },
+ 'sum' : { 'partitions': 5, 'replication-factor': 2 }
+ })
+ self.driver = StreamsEosTestDriverService(test_context, self.kafka)
+ self.processor1 = StreamsEosTestJobRunnerService(test_context, self.kafka)
+ self.processor2 = StreamsEosTestJobRunnerService(test_context, self.kafka)
+ self.verifier = StreamsEosTestVerifyRunnerService(test_context, self.kafka)
+
+ @cluster(num_nodes=8)
+ def test_rebalance(self):
+ """
+ Starts and stops two test clients a few times.
+ Ensure that all records are delivered exactly-once.
+ """
+
+ self.driver.start()
+ self.processor1.start()
+
+ time.sleep(30)
+
+ self.processor2.start()
+
+ time.sleep(30)
+ self.processor1.stop()
+
+ time.sleep(30)
+ self.processor1.start()
+
+ time.sleep(30)
+ self.processor2.stop()
+
+ time.sleep(30)
+
+ self.driver.stop()
+
+ self.processor1.stop()
+ self.processor2.stop()
+
+ self.verifier.start()
+ self.verifier.wait()
+
+ self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+
+ @cluster(num_nodes=8)
+ def test_failure_and_recovery(self):
+ """
+ Starts two test clients, then abort (kill -9) and restart them a few times.
+ Ensure that all records are delivered exactly-once.
+ """
+
+ self.driver.start()
+ self.processor1.start()
+ self.processor2.start()
+
+ time.sleep(30)
+ self.processor1.abortThenRestart()
+
+ time.sleep(30)
+ self.processor1.abortThenRestart()
+
+ time.sleep(30)
+ self.processor2.abortThenRestart()
+
+ time.sleep(30)
+
+ self.driver.stop()
+
+ self.processor1.stop()
+ self.processor2.stop()
+
+ self.verifier.start()
+ self.verifier.wait()
+
+ self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)