You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:24:46 UTC

[kafka] 02/03: KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7a72a2a6da414b4974963de89365b710c6730699
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Apr 29 20:42:13 2020 -0700

    KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
    
    Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.
    
    Reviewers: Gwen Shapira <cs...@gmail.com>, Aakash Shah <as...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>
---
 .../java/org/apache/kafka/connect/util/KafkaBasedLog.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e78276a..e301581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -281,9 +281,15 @@ public class KafkaBasedLog<K, V> {
             Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
+                TopicPartition topicPartition = entry.getKey();
+                long endOffset = entry.getValue();
+                long lastConsumedOffset = consumer.position(topicPartition);
+                if (lastConsumedOffset >= endOffset) {
+                    log.trace("Read to end offset {} for {}", endOffset, topicPartition);
                     it.remove();
-                else {
+                } else {
+                    log.trace("Behind end offset {} for {}; last-read offset is {}",
+                            endOffset, topicPartition, lastConsumedOffset);
                     poll(Integer.MAX_VALUE);
                     break;
                 }
@@ -345,4 +351,4 @@ public class KafkaBasedLog<K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}