You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/03 16:13:43 UTC

[kafka] branch trunk updated: KAFKA-10084: Fix EosTestDriver end offset (#8785)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb88be4  KAFKA-10084: Fix EosTestDriver end offset (#8785)
cb88be4 is described below

commit cb88be45ebc400069c03ad8b9a2332da0aecc7b8
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 3 11:12:47 2020 -0500

    KAFKA-10084: Fix EosTestDriver end offset (#8785)
    
    Check the uncommitted end offset after the committed end offset,
    so we can be sure never to miss a pending end-transaction marker.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 .../apache/kafka/streams/tests/EosTestDriver.java  | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index e95c354..45843aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -581,35 +581,32 @@ public class EosTestDriver extends SmokeTestUtil {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 
-        final Map<TopicPartition, Long> topicEndOffsets;
-
-        try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
-            topicEndOffsets = consumerUncommitted.endOffsets(partitions);
-        }
 
         final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        while (!topicEndOffsets.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
-            consumer.seekToEnd(partitions);
-
-            final Iterator<TopicPartition> iterator = partitions.iterator();
-            while (iterator.hasNext()) {
-                final TopicPartition topicPartition = iterator.next();
-                final long position = consumer.position(topicPartition);
-
-                if (position == topicEndOffsets.get(topicPartition)) {
-                    iterator.remove();
-                    topicEndOffsets.remove(topicPartition);
-                    System.out.println("Removing " + topicPartition + " at position " + position);
-                } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
-                    throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
-                } else {
-                    System.out.println("Retry " + topicPartition + " at position " + position);
+        try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
+            while (System.currentTimeMillis() < maxWaitTime) {
+                consumer.seekToEnd(partitions);
+                final Map<TopicPartition, Long> topicEndOffsets = consumerUncommitted.endOffsets(partitions);
+
+                final Iterator<TopicPartition> iterator = partitions.iterator();
+                while (iterator.hasNext()) {
+                    final TopicPartition topicPartition = iterator.next();
+                    final long position = consumer.position(topicPartition);
+
+                    if (position == topicEndOffsets.get(topicPartition)) {
+                        iterator.remove();
+                        System.out.println("Removing " + topicPartition + " at position " + position);
+                    } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
+                        throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
+                    } else {
+                        System.out.println("Retry " + topicPartition + " at position " + position);
+                    }
                 }
+                sleep(1000L);
             }
-            sleep(1000L);
         }
 
-        if (!topicEndOffsets.isEmpty()) {
+        if (!partitions.isEmpty()) {
             throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec.");
         }
     }