You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/25 16:34:31 UTC
kafka git commit: KAFKA-5362: Add Streams EOS system test with
repartitioning topic
Repository: kafka
Updated Branches:
refs/heads/trunk a5c47db13 -> 226583480
KAFKA-5362: Add Streams EOS system test with repartitioning topic
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3310 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22658348
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22658348
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22658348
Branch: refs/heads/trunk
Commit: 2265834803649d2f9fb090e86d11309f6e9789cb
Parents: a5c47db
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sun Jun 25 09:34:27 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sun Jun 25 09:34:27 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/StreamsConfig.java | 2 +-
.../internals/AbstractProcessorContext.java | 8 +-
.../kafka/streams/tests/EosTestClient.java | 43 ++-
.../kafka/streams/tests/EosTestDriver.java | 375 ++++++++++++-------
.../kafka/streams/tests/SmokeTestUtil.java | 63 ++--
.../kafka/streams/tests/StreamsEosTest.java | 13 +-
tests/kafkatest/services/streams.py | 10 +
.../kafkatest/tests/streams/streams_eos_test.py | 94 +++--
8 files changed, 391 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 02bebbc..e7a8c44 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -264,7 +264,7 @@ public class StreamsConfig extends AbstractConfig {
*/
@Deprecated
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
- private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+ private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management. This config is deprecated and will be ignored as Streams API does not use Zookeeper anymore.";
static {
CONFIG = new ConfigDef()
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index b306210..04af9f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -56,8 +56,8 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
this.config = config;
this.metrics = metrics;
this.stateManager = stateManager;
- valueSerde = config.valueSerde();
- keySerde = config.keySerde();
+ valueSerde = config.defaultValueSerde();
+ keySerde = config.defaultKeySerde();
this.cache = cache;
}
@@ -160,7 +160,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
- public Map<String, Object> appConfigsWithPrefix(String prefix) {
+ public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return config.originalsWithPrefix(prefix);
}
@@ -171,7 +171,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
@Override
public RecordContext recordContext() {
- return this.recordContext;
+ return recordContext;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index fe5dc64..7550109 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -34,13 +34,16 @@ public class EosTestClient extends SmokeTestUtil {
static final String APP_ID = "EosTest";
private final String kafka;
private final File stateDir;
+ private final boolean withRepartitioning;
+
private KafkaStreams streams;
private boolean uncaughtException;
- EosTestClient(final File stateDir, final String kafka) {
+ EosTestClient(final String kafka, final File stateDir, final boolean withRepartitioning) {
super();
- this.stateDir = stateDir;
this.kafka = kafka;
+ this.stateDir = stateDir;
+ this.withRepartitioning = withRepartitioning;
}
private boolean isRunning = true;
@@ -81,8 +84,8 @@ public class EosTestClient extends SmokeTestUtil {
}
}
- private static KafkaStreams createKafkaStreams(final File stateDir,
- final String kafka) {
+ private KafkaStreams createKafkaStreams(final File stateDir,
+ final String kafka) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@@ -144,6 +147,38 @@ public class EosTestClient extends SmokeTestUtil {
"sum")
.to(stringSerde, longSerde, "sum");
+ if (withRepartitioning) {
+ final KStream<String, Integer> repartitionedData = data.through("repartition");
+
+ repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"));
+
+ final KGroupedStream<String, Integer> groupedDataAfterRepartitioning = repartitionedData.groupByKey();
+ // max
+ groupedDataAfterRepartitioning
+ .aggregate(
+ new Initializer<Integer>() {
+ @Override
+ public Integer apply() {
+ return Integer.MIN_VALUE;
+ }
+ },
+ new Aggregator<String, Integer, Integer>() {
+ @Override
+ public Integer apply(final String aggKey,
+ final Integer value,
+ final Integer aggregate) {
+ return (value > aggregate) ? value : aggregate;
+ }
+ },
+ intSerde,
+ "max")
+ .to(stringSerde, intSerde, "max");
+
+ // count
+ groupedDataAfterRepartitioning.count("cnt")
+ .to(stringSerde, longSerde, "cnt");
+ }
+
return new KafkaStreams(builder, props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index a1b813b..af047a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -40,13 +40,13 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
@@ -68,7 +68,7 @@ public class EosTestDriver extends SmokeTestUtil {
});
final Properties producerProps = new Properties();
- producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -105,10 +105,10 @@ public class EosTestDriver extends SmokeTestUtil {
System.out.println(numRecordsProduced + " records produced");
}
- public static void verify(final String kafka) {
+ public static void verify(final String kafka, final boolean withRepartitioning) {
ensureStreamsApplicationDown(kafka);
- final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);
+ final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka, withRepartitioning);
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
@@ -117,27 +117,69 @@ public class EosTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+ final String[] allInputTopics;
+ final String[] allOutputTopics;
+ if (withRepartitioning) {
+ allInputTopics = new String[] {"data", "repartition"};
+ allOutputTopics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
+ } else {
+ allInputTopics = new String[] {"data"};
+ allOutputTopics = new String[] {"echo", "min", "sum"};
+ }
+
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> inputRecordsPerTopicPerPartition;
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
- final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
+ final List<TopicPartition> partitions = getAllPartitions(consumer, allInputTopics);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
- final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
- = getOutputRecords(consumer, committedOffsets);
+ inputRecordsPerTopicPerPartition = getRecords(consumer, committedOffsets, withRepartitioning, true);
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ System.out.println("FAILED");
+ return;
+ }
- truncate("data", recordPerTopicPerPartition, committedOffsets);
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> outputRecordsPerTopicPerPartition;
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ final List<TopicPartition> partitions = getAllPartitions(consumer, allOutputTopics);
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
- verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
- verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));
+ outputRecordsPerTopicPerPartition = getRecords(consumer, consumer.endOffsets(partitions), withRepartitioning, false);
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ System.out.println("FAILED");
+ return;
+ }
+
+ verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo"));
+ if (withRepartitioning) {
+ verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition"));
+ }
+
+ verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min"));
+ verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum"));
- verifyAllTransactionFinished(consumer, kafka);
+ if (withRepartitioning) {
+ verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max"));
+ verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt"));
+ }
- // do not modify: required test output
- System.out.println("ALL-RECORDS-DELIVERED");
+ try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ final List<TopicPartition> partitions = getAllPartitions(consumer, allOutputTopics);
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+
+ verifyAllTransactionFinished(consumer, kafka, withRepartitioning);
} catch (final Exception e) {
e.printStackTrace(System.err);
System.out.println("FAILED");
+ return;
}
+
+ // do not modify: required test output
+ System.out.println("ALL-RECORDS-DELIVERED");
}
private static void ensureStreamsApplicationDown(final String kafka) {
@@ -159,7 +201,8 @@ public class EosTestDriver extends SmokeTestUtil {
}
}
- private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
+ private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka,
+ final boolean withRepartitioning) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
@@ -171,6 +214,9 @@ public class EosTestDriver extends SmokeTestUtil {
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final Set<String> topics = new HashSet<>();
topics.add("data");
+ if (withRepartitioning) {
+ topics.add("repartition");
+ }
consumer.subscribe(topics);
consumer.poll(0);
@@ -190,8 +236,10 @@ public class EosTestDriver extends SmokeTestUtil {
return committedOffsets;
}
- private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
- final Map<TopicPartition, Long> committedOffsets) {
+ private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getRecords(final KafkaConsumer<byte[], byte[]> consumer,
+ final Map<TopicPartition, Long> readEndOffsets,
+ final boolean withRepartitioning,
+ final boolean isInputTopic) {
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -201,16 +249,20 @@ public class EosTestDriver extends SmokeTestUtil {
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
- addRecord(record, recordPerTopicPerPartition);
+ final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+ final long readEndOffset = readEndOffsets.get(tp);
+ if (record.offset() < readEndOffset) {
+ addRecord(record, recordPerTopicPerPartition, withRepartitioning);
+ } else if (!isInputTopic) {
+ throw new RuntimeException("FAIL: did receive more records than expected for " + tp
+ + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
+ }
+ if (consumer.position(tp) >= readEndOffset) {
+ consumer.pause(Collections.singletonList(tp));
+ }
}
- if (receivedRecords.count() > 0) {
- allRecordsReceived =
- receivedAllRecords(
- recordPerTopicPerPartition.get("data"),
- recordPerTopicPerPartition.get("echo"),
- committedOffsets);
- }
+ allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size();
}
if (!allRecordsReceived) {
@@ -221,16 +273,13 @@ public class EosTestDriver extends SmokeTestUtil {
}
private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
- final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition) {
+ final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
+ final boolean withRepartitioning) {
final String topic = record.topic();
final TopicPartition partition = new TopicPartition(topic, record.partition());
- if ("data".equals(topic)
- || "echo".equals(topic)
- || "min".equals(topic)
- || "sum".equals(topic)) {
-
+ if (verifyTopic(topic, withRepartitioning)) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
= recordPerTopicPerPartition.get(topic);
@@ -250,156 +299,218 @@ public class EosTestDriver extends SmokeTestUtil {
}
}
- private static boolean receivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> supersetExpectedRecords,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords,
- final Map<TopicPartition, Long> committedOffsets) {
- if (supersetExpectedRecords == null
- || receivedRecords == null
- || supersetExpectedRecords.keySet().size() < committedOffsets.keySet().size()
- || receivedRecords.keySet().size() < committedOffsets.keySet().size()) {
+ private static boolean verifyTopic(final String topic,
+ final boolean withRepartitioning) {
+ final boolean validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
- return false;
+ if (withRepartitioning) {
+ return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic);
}
+ return validTopic;
+ }
+
+ private static void verifyReceivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> expectedRecords,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
+ if (expectedRecords.size() != receivedRecords.size()) {
+ throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size());
+ }
+
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
- final TopicPartition tp = partitionRecords.getKey();
- final int numberOfReceivedRecords = partitionRecords.getValue().size();
- final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
- if (committed != null) {
- if (numberOfReceivedRecords < committed) {
- return false;
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecords.get(inputTopicPartition).iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
+ final ConsumerRecord<byte[], byte[]> expected = expectedRecord.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
+ final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+
+ if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
}
- } else if (numberOfReceivedRecords > 0) {
- throw new RuntimeException("Result verification failed for partition " + tp
- + ". No offset was committed but we received " + numberOfReceivedRecords + " records.");
}
}
+ }
+ private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
- for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
- try {
- final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
- final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecords = supersetExpectedRecords.get(inputTopicPartition).iterator();
- for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
- final ConsumerRecord<byte[], byte[]> expected = expectedRecords.next();
+ final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+ final List<ConsumerRecord<byte[], byte[]>> partitionMin = partitionRecords.getValue();
- final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
- final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
- final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
- final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
+ if (partitionInput.size() != partitionMin.size()) {
+ throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+ + partitionRecords.getKey() + " but received " + partitionMin.size());
+ }
- if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
- throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
- }
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMin) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+ final int value = integerDeserializer.deserialize(input.topic(), input.value());
+
+ Integer min = currentMinPerKey.get(key);
+ if (min == null) {
+ min = value;
+ } else {
+ min = Math.min(min, value);
+ }
+ currentMinPerKey.put(key, min);
+
+ if (!receivedKey.equals(key) || receivedValue != min) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
}
- } catch (final NullPointerException | NoSuchElementException e) {
- return false;
}
}
-
- return true;
}
- private static void truncate(final String topic,
- final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
- final Map<TopicPartition, Long> committedOffsets) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecords = recordPerTopicPerPartition.get(topic);
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> truncatedTopicRecords = recordPerTopicPerPartition.get(topic);
+ private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
+ final LongDeserializer longDeserializer = new LongDeserializer();
- for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) {
- final TopicPartition tp = partitionRecords.getKey();
- final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
- truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committed != null ? committed.intValue() : 0));
- }
+ final HashMap<String, Long> currentSumPerKey = new HashMap<>();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
+ final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
+ final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+ final List<ConsumerRecord<byte[], byte[]>> partitionSum = partitionRecords.getValue();
+
+ if (partitionInput.size() != partitionSum.size()) {
+ throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+ + partitionRecords.getKey() + " but received " + partitionSum.size());
+ }
+
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionSum) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
- recordPerTopicPerPartition.put(topic, truncatedTopicRecords);
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+ final int value = integerDeserializer.deserialize(input.topic(), input.value());
+
+ Long sum = currentSumPerKey.get(key);
+ if (sum == null) {
+ sum = (long) value;
+ } else {
+ sum += value;
+ }
+ currentSumPerKey.put(key, sum);
+
+ if (!receivedKey.equals(key) || receivedValue != sum) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
+ }
+ }
+ }
}
- private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+ private static void verifyMax(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
- for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
- try {
- final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
- final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
+ final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
+ final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+ final List<ConsumerRecord<byte[], byte[]>> partitionMax = partitionRecords.getValue();
+
+ if (partitionInput.size() != partitionMax.size()) {
+ throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+ + partitionRecords.getKey() + " but received " + partitionMax.size());
+ }
- for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
- final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
- final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
- final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
- final String key = stringDeserializer.deserialize(input.topic(), input.key());
- final int value = integerDeserializer.deserialize(input.topic(), input.value());
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMax) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+ final int value = integerDeserializer.deserialize(input.topic(), input.value());
- Integer min = currentMinPerKey.get(key);
- if (min == null) {
- min = value;
- } else {
- min = Math.min(min, value);
- }
- currentMinPerKey.put(key, min);
- if (!receivedKey.equals(key) || receivedValue != min) {
- throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
- }
+ Integer max = currentMinPerKey.get(key);
+ if (max == null) {
+ max = Integer.MIN_VALUE;
+ }
+ max = Math.max(max, value);
+ currentMinPerKey.put(key, max);
+
+ if (!receivedKey.equals(key) || receivedValue != max.intValue()) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
}
- } catch (final NullPointerException e) {
- System.err.println(inputPerTopicPerPartition);
- e.printStackTrace(System.err);
- throw e;
}
}
}
- private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
+ private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
final LongDeserializer longDeserializer = new LongDeserializer();
final HashMap<String, Long> currentSumPerKey = new HashMap<>();
- for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
- try {
- final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
- final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
-
- for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
- final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
-
- final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
- final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
- final String key = stringDeserializer.deserialize(input.topic(), input.key());
- final int value = integerDeserializer.deserialize(input.topic(), input.value());
-
- Long sum = currentSumPerKey.get(key);
- if (sum == null) {
- sum = (long) value;
- } else {
- sum += value;
- }
- currentSumPerKey.put(key, sum);
+ for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
+ final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
+ final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
+ final List<ConsumerRecord<byte[], byte[]>> partitionCnt = partitionRecords.getValue();
+
+ if (partitionInput.size() != partitionCnt.size()) {
+ throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
+ + partitionRecords.getKey() + " but received " + partitionCnt.size());
+ }
- if (!receivedKey.equals(key) || receivedValue != sum) {
- throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
- }
+ final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
+
+ for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionCnt) {
+ final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
+
+ final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
+ final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
+ final String key = stringDeserializer.deserialize(input.topic(), input.key());
+
+ Long cnt = currentSumPerKey.get(key);
+ if (cnt == null) {
+ cnt = 0L;
+ }
+ currentSumPerKey.put(key, ++cnt);
+
+ if (!receivedKey.equals(key) || receivedValue != cnt.longValue()) {
+ throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
}
- } catch (final NullPointerException e) {
- System.err.println(inputPerTopicPerPartition);
- e.printStackTrace(System.err);
- throw e;
}
}
}
private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
- final String kafka) {
- final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "min", "sum");
+ final String kafka,
+ final boolean withRepartitioning) {
+ final String[] topics;
+ if (withRepartitioning) {
+ topics = new String[] {"echo", "min", "sum", "repartition", "max", "min"};
+ } else {
+ topics = new String[] {"echo", "min", "sum"};
+ }
+
+ final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
consumer.poll(0);
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 309c90d..150ec7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -32,31 +32,32 @@ import java.io.File;
public class SmokeTestUtil {
- public final static int WINDOW_SIZE = 100;
- public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
- public final static int END = Integer.MAX_VALUE;
+ final static int END = Integer.MAX_VALUE;
- public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
+ static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, false);
}
- public static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
+ private static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final boolean printOffset) {
return new ProcessorSupplier<Object, Object>() {
+ @Override
public Processor<Object, Object> get() {
return new AbstractProcessor<Object, Object>() {
private int numRecordsProcessed = 0;
private ProcessorContext context;
@Override
- public void init(ProcessorContext context) {
+ public void init(final ProcessorContext context) {
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
this.context = context;
}
@Override
- public void process(Object key, Object value) {
- if (printOffset) System.out.println(">>> " + context.offset());
+ public void process(final Object key, final Object value) {
+ if (printOffset) {
+ System.out.println(">>> " + context.offset());
+ }
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -64,29 +65,28 @@ public class SmokeTestUtil {
}
@Override
- public void punctuate(long timestamp) {
- }
+ public void punctuate(final long timestamp) { }
@Override
- public void close() {
- }
+ public void close() { }
};
}
};
}
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
- public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
- return new KeyValue<K, V>(winKey.key(), value);
+ @Override
+ public KeyValue<K, V> apply(final Windowed<K> winKey, final V value) {
+ return new KeyValue<>(winKey.key(), value);
}
}
public static class Agg {
- public KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+ KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
return new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
@Override
- public KeyValue<String, Long> apply(String key, Long value) {
+ public KeyValue<String, Long> apply(final String key, final Long value) {
return new KeyValue<>(value == null ? null : Long.toString(value), 1L);
}
};
@@ -101,19 +101,19 @@ public class SmokeTestUtil {
};
}
- public Aggregator<String, Long, Long> adder() {
+ Aggregator<String, Long, Long> adder() {
return new Aggregator<String, Long, Long>() {
@Override
- public Long apply(String aggKey, Long value, Long aggregate) {
+ public Long apply(final String aggKey, final Long value, final Long aggregate) {
return aggregate + value;
}
};
}
- public Aggregator<String, Long, Long> remover() {
+ Aggregator<String, Long, Long> remover() {
return new Aggregator<String, Long, Long>() {
@Override
- public Long apply(String aggKey, Long value, Long aggregate) {
+ public Long apply(final String aggKey, final Long value, final Long aggregate) {
return aggregate - value;
}
};
@@ -124,33 +124,22 @@ public class SmokeTestUtil {
public static Serde<Integer> intSerde = Serdes.Integer();
- public static Serde<Long> longSerde = Serdes.Long();
+ static Serde<Long> longSerde = Serdes.Long();
- public static Serde<Double> doubleSerde = Serdes.Double();
+ static Serde<Double> doubleSerde = Serdes.Double();
- public static File createDir(String path) throws Exception {
- File dir = new File(path);
+ static File createDir(final File parent, final String child) throws Exception {
+ final File dir = new File(parent, child);
dir.mkdir();
return dir;
}
- public static File createDir(File parent, String child) throws Exception {
- File dir = new File(parent, child);
-
- dir.mkdir();
-
- return dir;
- }
-
- public static void sleep(long duration) {
+ public static void sleep(final long duration) {
try {
Thread.sleep(duration);
- } catch (Exception ex) {
- //
- }
+ } catch (final Exception ignore) { }
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
index 58499aa..d508286 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
@@ -30,9 +30,9 @@ public class StreamsEosTest {
final String command = args.length > 2 ? args[2] : null;
System.out.println("StreamsTest instance started");
- System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
+ System.out.println("command=" + command);
if (command == null || stateDir == null) {
System.exit(-1);
@@ -43,11 +43,16 @@ public class StreamsEosTest {
EosTestDriver.generate(kafka);
break;
case "process":
- final EosTestClient client = new EosTestClient(new File(stateDir), kafka);
- client.start();
+ new EosTestClient(kafka, new File(stateDir), false).start();
+ break;
+ case "process-complex":
+ new EosTestClient(kafka, new File(stateDir), true).start();
break;
case "verify":
- EosTestDriver.verify(kafka);
+ EosTestDriver.verify(kafka, false);
+ break;
+ case "verify-complex":
+ EosTestDriver.verify(kafka, true);
break;
default:
System.out.println("unknown command: " + command);
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/tests/kafkatest/services/streams.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index f6b6dd4..a0d9c57 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -181,11 +181,21 @@ class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
+class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
+
+
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
+class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
+ def __init__(self, test_context, kafka):
+ super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
+
+
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")
http://git-wip-us.apache.org/repos/asf/kafka/blob/22658348/tests/kafkatest/tests/streams/streams_eos_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 305cde0..be2c4f8 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -16,7 +16,8 @@
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, StreamsEosTestVerifyRunnerService
+from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
+ StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
import time
@@ -30,76 +31,99 @@ class StreamsEosTest(KafkaTest):
'data' : { 'partitions': 5, 'replication-factor': 2 },
'echo' : { 'partitions': 5, 'replication-factor': 2 },
'min' : { 'partitions': 5, 'replication-factor': 2 },
- 'sum' : { 'partitions': 5, 'replication-factor': 2 }
+ 'sum' : { 'partitions': 5, 'replication-factor': 2 },
+ 'repartition' : { 'partitions': 5, 'replication-factor': 2 },
+ 'max' : { 'partitions': 5, 'replication-factor': 2 },
+ 'cnt' : { 'partitions': 5, 'replication-factor': 2 }
})
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
- self.processor1 = StreamsEosTestJobRunnerService(test_context, self.kafka)
- self.processor2 = StreamsEosTestJobRunnerService(test_context, self.kafka)
- self.verifier = StreamsEosTestVerifyRunnerService(test_context, self.kafka)
+ self.test_context = test_context
@cluster(num_nodes=8)
- def test_rebalance(self):
+ def test_rebalance_simple(self):
+ self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+ @cluster(num_nodes=8)
+ def test_rebalance_complex(self):
+ self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+ def run_rebalance(self, processor1, processor2, verifier):
"""
Starts and stops two test clients a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
- self.processor1.start()
+ processor1.start()
- time.sleep(30)
+ time.sleep(120)
- self.processor2.start()
+ processor2.start()
- time.sleep(30)
- self.processor1.stop()
+ time.sleep(120)
+ processor1.stop()
- time.sleep(30)
- self.processor1.start()
+ time.sleep(120)
+ processor1.start()
- time.sleep(30)
- self.processor2.stop()
+ time.sleep(120)
+ processor2.stop()
- time.sleep(30)
+ time.sleep(120)
self.driver.stop()
- self.processor1.stop()
- self.processor2.stop()
+ processor1.stop()
+ processor2.stop()
- self.verifier.start()
- self.verifier.wait()
+ verifier.start()
+ verifier.wait()
- self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+ verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=8)
def test_failure_and_recovery(self):
+ self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+ @cluster(num_nodes=8)
+ def test_failure_and_recovery_complex(self):
+ self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
+ StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
+
+ def run_failure_and_recovery(self, processor1, processor2, verifier):
"""
Starts two test clients, then abort (kill -9) and restart them a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
- self.processor1.start()
- self.processor2.start()
+ processor1.start()
+ processor2.start()
- time.sleep(30)
- self.processor1.abortThenRestart()
+ time.sleep(120)
+ processor1.abortThenRestart()
- time.sleep(30)
- self.processor1.abortThenRestart()
+ time.sleep(120)
+ processor1.abortThenRestart()
- time.sleep(30)
- self.processor2.abortThenRestart()
+ time.sleep(120)
+ processor2.abortThenRestart()
- time.sleep(30)
+ time.sleep(120)
self.driver.stop()
- self.processor1.stop()
- self.processor2.stop()
+ processor1.stop()
+ processor2.stop()
- self.verifier.start()
- self.verifier.wait()
+ verifier.start()
+ verifier.wait()
- self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
+ verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)