You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/17 00:55:01 UTC

[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
82d5720aae7 is described below

commit 82d5720aae78c9e17606c8345dfc208557f9a8f2
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Thu Feb 16 16:54:50 2023 -0800

    KAFKA-14253 - More informative logging (#13253)
    
    Includes 2 requirements from the ticket:
    * Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as")
    * Sort the member ids (to help compare the membership and assignment across rebalances)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../internals/StreamsPartitionAssignor.java        | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1875f57b649..46c1e41e6c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-
+import static java.util.Map.Entry.comparingByKey;
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         final boolean lagComputationSuccessful =
             populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics);
 
-        log.info("All members participating in this rebalance: \n{}.",
-                 clientStates.entrySet().stream()
-                     .map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
-                     .collect(Collectors.joining(Utils.NL)));
+        log.info("{} members participating in this rebalance: \n{}.",
+                clientStates.size(),
+                clientStates.entrySet().stream()
+                        .sorted(comparingByKey())
+                        .map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
+                        .collect(Collectors.joining(Utils.NL)));
 
         final Set<TaskId> allTasks = partitionsForTask.keySet();
         statefulTasks.addAll(changelogTopics.statefulTaskIds());
@@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
                                                                    statefulTasks,
                                                                    assignmentConfigs);
 
-        log.info("Assigned tasks {} including stateful {} to clients as: \n{}.",
-                allTasks, statefulTasks, clientStates.entrySet().stream()
+        log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
+                allTasks.size(),
+                allTasks,
+                statefulTasks,
+                clientStates.size(),
+                clientStates.entrySet().stream()
+                        .sorted(comparingByKey())
                         .map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment())
                         .collect(Collectors.joining(Utils.NL)));