You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/07 19:05:46 UTC
[kafka] branch trunk updated: MINOR: Log4j Improvements on Fetcher
(#8629)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71397ad MINOR: Log4j Improvements on Fetcher (#8629)
71397ad is described below
commit 71397adaff8fb4e9dd126c51cd38110cbd813936
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 7 12:03:43 2020 -0700
MINOR: Log4j Improvements on Fetcher (#8629)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 6 ++++--
.../java/org/apache/kafka/common/record/DefaultRecordBatch.java | 2 ++
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0699684..68c7347 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -676,13 +676,15 @@ public class Fetcher<K, V> implements Closeable {
if (completedFetch.nextFetchOffset == position.offset) {
List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
+ log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+ partRecords.size(), position, completedFetch.partition);
+
if (completedFetch.nextFetchOffset > position.offset) {
SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
completedFetch.nextFetchOffset,
completedFetch.lastEpoch,
position.currentLeader);
- log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
- "position to {}", position, completedFetch.partition, nextPosition);
+ log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
subscriptions.position(completedFetch.partition, nextPosition);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index d4a9587..b49f2fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -484,6 +484,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
@Override
public String toString() {
return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
+ "sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
+ "isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " +
"compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
}