You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/05/07 19:05:46 UTC

[kafka] branch trunk updated: MINOR: Log4j Improvements on Fetcher (#8629)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71397ad  MINOR: Log4j Improvements on Fetcher (#8629)
71397ad is described below

commit 71397adaff8fb4e9dd126c51cd38110cbd813936
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu May 7 12:03:43 2020 -0700

    MINOR: Log4j Improvements on Fetcher (#8629)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java   | 6 ++++--
 .../java/org/apache/kafka/common/record/DefaultRecordBatch.java     | 2 ++
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 0699684..68c7347 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -676,13 +676,15 @@ public class Fetcher<K, V> implements Closeable {
             if (completedFetch.nextFetchOffset == position.offset) {
                 List<ConsumerRecord<K, V>> partRecords = completedFetch.fetchRecords(maxRecords);
 
+                log.trace("Returning {} fetched records at offset {} for assigned partition {}",
+                        partRecords.size(), position, completedFetch.partition);
+
                 if (completedFetch.nextFetchOffset > position.offset) {
                     SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
-                            "position to {}", position, completedFetch.partition, nextPosition);
+                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
                     subscriptions.position(completedFetch.partition, nextPosition);
                 }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index d4a9587..b49f2fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -484,6 +484,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
     @Override
     public String toString() {
         return "RecordBatch(magic=" + magic() + ", offsets=[" + baseOffset() + ", " + lastOffset() + "], " +
+                "sequence=[" + baseSequence() + ", " + lastSequence() + "], " +
+                "isTransactional=" + isTransactional() + ", isControlBatch=" + isControlBatch() + ", " +
                 "compression=" + compressionType() + ", timestampType=" + timestampType() + ", crc=" + checksum() + ")";
     }