You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2019/06/04 21:49:52 UTC
[kafka] branch trunk updated: Improve logging in the consumer for
epoch updates (#6879)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 264d1d8 Improve logging in the consumer for epoch updates (#6879)
264d1d8 is described below
commit 264d1d8a8b7ede0fb8e3595d0153893966bb73b8
Author: David Arthur <mu...@gmail.com>
AuthorDate: Tue Jun 4 17:49:30 2019 -0400
Improve logging in the consumer for epoch updates (#6879)
---
clients/src/main/java/org/apache/kafka/clients/Metadata.java | 2 +-
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java | 5 ++++-
.../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 7 ++++---
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f991fa6..94e4eb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -177,7 +177,7 @@ public class Metadata implements Closeable {
}
return true;
} else {
- log.debug("Not replacing existing epoch {} with new epoch {}", oldEpoch, epoch);
+ log.debug("Not replacing existing epoch {} with new epoch {} for partition {}", oldEpoch, epoch, topicPartition);
return false;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
index b928b8e..b5c9de6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java
@@ -137,7 +137,10 @@ public class MetadataCache {
@Override
public String toString() {
return "MetadataCache{" +
- "cluster=" + cluster() +
+ "clusterId='" + clusterId + '\'' +
+ ", nodes=" + nodes +
+ ", partitions=" + metadataByPartition.values() +
+ ", controller=" + controller +
'}';
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e638963..cff6e30 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -861,13 +861,14 @@ public class Fetcher<K, V> implements Closeable {
final Map<TopicPartition, ListOffsetRequest.PartitionData> partitionDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
+ Long offset = entry.getValue();
Optional<MetadataCache.PartitionInfoAndEpoch> currentInfo = metadata.partitionInfoIfCurrent(tp);
if (!currentInfo.isPresent()) {
- log.debug("Leader for partition {} is unknown for fetching offset", tp);
+ log.debug("Leader for partition {} is unknown for fetching offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (currentInfo.get().partitionInfo().leader() == null) {
- log.debug("Leader for partition {} is unavailable for fetching offset", tp);
+ log.debug("Leader for partition {} is unavailable for fetching offset {}", tp, offset);
metadata.requestUpdate();
partitionsToRetry.add(tp);
} else if (client.isUnavailable(currentInfo.get().partitionInfo().leader())) {
@@ -881,7 +882,7 @@ public class Fetcher<K, V> implements Closeable {
partitionsToRetry.add(tp);
} else {
partitionDataMap.put(tp,
- new ListOffsetRequest.PartitionData(entry.getValue(), Optional.of(currentInfo.get().epoch())));
+ new ListOffsetRequest.PartitionData(offset, Optional.of(currentInfo.get().epoch())));
}
}
return regroupPartitionMapByNode(partitionDataMap);