You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/06/03 16:13:43 UTC
[kafka] branch trunk updated: KAFKA-10084: Fix EosTestDriver end
offset (#8785)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb88be4 KAFKA-10084: Fix EosTestDriver end offset (#8785)
cb88be4 is described below
commit cb88be45ebc400069c03ad8b9a2332da0aecc7b8
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 3 11:12:47 2020 -0500
KAFKA-10084: Fix EosTestDriver end offset (#8785)
Check the uncommitted end offset after the committed end offset,
so we can be sure never to miss a pending end-transaction marker.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/tests/EosTestDriver.java | 43 ++++++++++------------
1 file changed, 20 insertions(+), 23 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index e95c354..45843aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -581,35 +581,32 @@ public class EosTestDriver extends SmokeTestUtil {
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- final Map<TopicPartition, Long> topicEndOffsets;
-
- try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
- topicEndOffsets = consumerUncommitted.endOffsets(partitions);
- }
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
- while (!topicEndOffsets.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
- consumer.seekToEnd(partitions);
-
- final Iterator<TopicPartition> iterator = partitions.iterator();
- while (iterator.hasNext()) {
- final TopicPartition topicPartition = iterator.next();
- final long position = consumer.position(topicPartition);
-
- if (position == topicEndOffsets.get(topicPartition)) {
- iterator.remove();
- topicEndOffsets.remove(topicPartition);
- System.out.println("Removing " + topicPartition + " at position " + position);
- } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
- throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
- } else {
- System.out.println("Retry " + topicPartition + " at position " + position);
+ try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
+ while (System.currentTimeMillis() < maxWaitTime) {
+ consumer.seekToEnd(partitions);
+ final Map<TopicPartition, Long> topicEndOffsets = consumerUncommitted.endOffsets(partitions);
+
+ final Iterator<TopicPartition> iterator = partitions.iterator();
+ while (iterator.hasNext()) {
+ final TopicPartition topicPartition = iterator.next();
+ final long position = consumer.position(topicPartition);
+
+ if (position == topicEndOffsets.get(topicPartition)) {
+ iterator.remove();
+ System.out.println("Removing " + topicPartition + " at position " + position);
+ } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
+ throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
+ } else {
+ System.out.println("Retry " + topicPartition + " at position " + position);
+ }
}
+ sleep(1000L);
}
- sleep(1000L);
}
- if (!topicEndOffsets.isEmpty()) {
+ if (!partitions.isEmpty()) {
throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec.");
}
}