You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/17 00:55:01 UTC
[kafka] branch trunk updated: KAFKA-14253 - More informative logging (#13253)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 82d5720aae7 KAFKA-14253 - More informative logging (#13253)
82d5720aae7 is described below
commit 82d5720aae78c9e17606c8345dfc208557f9a8f2
Author: Philip Nee <pn...@confluent.io>
AuthorDate: Thu Feb 16 16:54:50 2023 -0800
KAFKA-14253 - More informative logging (#13253)
Includes 2 requirements from the ticket:
* Include the number of members in the group (I.e., "15 members participating" and "to 15 clients as")
* Sort the member ids (to help compare the membership and assignment across rebalances)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../internals/StreamsPartitionAssignor.java | 22 ++++++++++++++--------
1 file changed, 14 insertions(+), 8 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 1875f57b649..46c1e41e6c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -76,9 +76,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-
+import static java.util.Map.Entry.comparingByKey;
import static java.util.UUID.randomUUID;
-
import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -619,10 +618,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final boolean lagComputationSuccessful =
populateClientStatesMap(clientStates, clientMetadataMap, taskForPartition, changelogTopics);
- log.info("All members participating in this rebalance: \n{}.",
- clientStates.entrySet().stream()
- .map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
- .collect(Collectors.joining(Utils.NL)));
+ log.info("{} members participating in this rebalance: \n{}.",
+ clientStates.size(),
+ clientStates.entrySet().stream()
+ .sorted(comparingByKey())
+ .map(entry -> entry.getKey() + ": " + entry.getValue().consumers())
+ .collect(Collectors.joining(Utils.NL)));
final Set<TaskId> allTasks = partitionsForTask.keySet();
statefulTasks.addAll(changelogTopics.statefulTaskIds());
@@ -637,8 +638,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
statefulTasks,
assignmentConfigs);
- log.info("Assigned tasks {} including stateful {} to clients as: \n{}.",
- allTasks, statefulTasks, clientStates.entrySet().stream()
+ log.info("{} assigned tasks {} including stateful {} to {} clients as: \n{}.",
+ allTasks.size(),
+ allTasks,
+ statefulTasks,
+ clientStates.size(),
+ clientStates.entrySet().stream()
+ .sorted(comparingByKey())
.map(entry -> entry.getKey() + "=" + entry.getValue().currentAssignment())
.collect(Collectors.joining(Utils.NL)));