You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/02/20 21:07:53 UTC

[kafka] branch trunk updated: MINOR: follow up on Streams EOS system tests (#4593)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a050eb5  MINOR: follow up on Streams EOS system tests (#4593)
a050eb5 is described below

commit a050eb56f759954498a0a58993d754cebde144d5
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Tue Feb 20 13:07:50 2018 -0800

    MINOR: follow up on Streams EOS system tests (#4593)
---
 .../org/apache/kafka/streams/tests/EosTestDriver.java     | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 7c7485d..752cdd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -511,7 +511,6 @@ public class EosTestDriver extends SmokeTestUtil {
     private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
                                   final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
         final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         final LongDeserializer longDeserializer = new LongDeserializer();
 
         final HashMap<String, Long> currentSumPerKey = new HashMap<>();
@@ -552,7 +551,7 @@ public class EosTestDriver extends SmokeTestUtil {
                                                      final boolean withRepartitioning) {
         final String[] topics;
         if (withRepartitioning) {
-            topics = new String[] {"echo", "min", "sum", "repartition", "max", "min"};
+            topics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
         } else {
             topics = new String[] {"echo", "min", "sum"};
         }
@@ -560,7 +559,9 @@ public class EosTestDriver extends SmokeTestUtil {
         final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);
-        consumer.poll(0);
+        for (final TopicPartition tp : partitions) {
+            System.out.println(tp + " at position " + consumer.position(tp));
+        }
 
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
@@ -591,6 +592,12 @@ public class EosTestDriver extends SmokeTestUtil {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
             final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            if (records.isEmpty()) {
+                System.out.println("No data received.");
+                for (final TopicPartition tp : partitions) {
+                    System.out.println(tp + " at position " + consumer.position(tp));
+                }
+            }
             for (final ConsumerRecord<byte[], byte[]> record : records) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
                 final String topic = record.topic();
@@ -604,6 +611,8 @@ public class EosTestDriver extends SmokeTestUtil {
                         throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
                             "Expected record <'key','value'> from one of " + partitions + " but got"
                             + " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
+                    } else {
+                        System.out.println("Verifying " + tp + " successful.");
                     }
                 } catch (final SerializationException e) {
                     throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.