You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/04/30 03:48:15 UTC
[kafka] branch 2.5 updated: KAFKA-9919: Add logging to
KafkaBasedLog::readToLogEnd (#8554)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 794230a KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
794230a is described below
commit 794230a1cc97a75de969499cbf69f23a45d19c4c
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed Apr 29 20:42:13 2020 -0700
KAFKA-9919: Add logging to KafkaBasedLog::readToLogEnd (#8554)
Simple logging additions at TRACE level that should help when the worker can't get caught up to the end of an internal topic.
Reviewers: Gwen Shapira <cs...@gmail.com>, Aakash Shah <as...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>
---
.../java/org/apache/kafka/connect/util/KafkaBasedLog.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e78276a..e301581 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -281,9 +281,15 @@ public class KafkaBasedLog<K, V> {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ TopicPartition topicPartition = entry.getKey();
+ long endOffset = entry.getValue();
+ long lastConsumedOffset = consumer.position(topicPartition);
+ if (lastConsumedOffset >= endOffset) {
+ log.trace("Read to end offset {} for {}", endOffset, topicPartition);
it.remove();
- else {
+ } else {
+ log.trace("Behind end offset {} for {}; last-read offset is {}",
+ endOffset, topicPartition, lastConsumedOffset);
poll(Integer.MAX_VALUE);
break;
}
@@ -345,4 +351,4 @@ public class KafkaBasedLog<K, V> {
}
}
}
-}
\ No newline at end of file
+}