You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/20 21:07:53 UTC
[kafka] branch trunk updated: MINOR: follow up on Streams EOS
system tests (#4593)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a050eb5 MINOR: follow up on Streams EOS system tests (#4593)
a050eb5 is described below
commit a050eb56f759954498a0a58993d754cebde144d5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Feb 20 13:07:50 2018 -0800
MINOR: follow up on Streams EOS system tests (#4593)
---
.../org/apache/kafka/streams/tests/EosTestDriver.java | 15 ++++++++++++---
1 file changed, 12 insertions(+), 3 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 7c7485d..752cdd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -511,7 +511,6 @@ public class EosTestDriver extends SmokeTestUtil {
private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
- final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
final LongDeserializer longDeserializer = new LongDeserializer();
final HashMap<String, Long> currentSumPerKey = new HashMap<>();
@@ -552,7 +551,7 @@ public class EosTestDriver extends SmokeTestUtil {
final boolean withRepartitioning) {
final String[] topics;
if (withRepartitioning) {
- topics = new String[] {"echo", "min", "sum", "repartition", "max", "min"};
+ topics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
} else {
topics = new String[] {"echo", "min", "sum"};
}
@@ -560,7 +559,9 @@ public class EosTestDriver extends SmokeTestUtil {
final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
consumer.assign(partitions);
consumer.seekToEnd(partitions);
- consumer.poll(0);
+ for (final TopicPartition tp : partitions) {
+ System.out.println(tp + " at position " + consumer.position(tp));
+ }
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
@@ -591,6 +592,12 @@ public class EosTestDriver extends SmokeTestUtil {
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+ if (records.isEmpty()) {
+ System.out.println("No data received.");
+ for (final TopicPartition tp : partitions) {
+ System.out.println(tp + " at position " + consumer.position(tp));
+ }
+ }
for (final ConsumerRecord<byte[], byte[]> record : records) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
final String topic = record.topic();
@@ -604,6 +611,8 @@ public class EosTestDriver extends SmokeTestUtil {
throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
"Expected record <'key','value'> from one of " + partitions + " but got"
+ " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
+ } else {
+ System.out.println("Verifying " + tp + " successful.");
}
} catch (final SerializationException e) {
throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.