You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/15 14:50:52 UTC

[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r769662220



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -2300,21 +2300,32 @@ public ConsumerGroupMetadata groupMetadata() {
      * {@link org.apache.kafka.clients.consumer.ConsumerPartitionAssignor ConsumerPartitionAssignor}, you should not
      * use this API.
      *
+     * @param reason The reason why the new rebalance is needed.
+     *
      * @throws java.lang.IllegalStateException if the consumer does not use group subscription
      */
     @Override
-    public void enforceRebalance() {
+    public void enforceRebalance(final String reason) {
         acquireAndEnsureOpen();
         try {
             if (coordinator == null) {
                 throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
             }
-            coordinator.requestRejoin("rebalance enforced by user");
+            String defaultReason = "rebalance enforced by user";

Review comment:
       nit: Should we define this one as a constant?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
                 final RuntimeException exception = future.exception();
 
                 resetJoinGroupFuture();
+                rejoinReason = exception.getMessage();

Review comment:
       I wonder if we should put a bit more information here. For instance, we could say `rebalance failed due to %s error: %s` where the first `%s` would be the exception's class and the second the message. What do you think?

##########
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##########
@@ -54,6 +56,9 @@
         "about": "The protocol name." },
       { "name": "Metadata", "type": "bytes", "versions": "0+",
         "about": "The protocol metadata." }
-    ]}
+    ]},
+    { "name": "Reason", "type": "string",
+      "versions": "8+", "nullableVersions": "8+","default": "null",

Review comment:
       Could we also update `RequestResponseTest` to cover this change?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -541,8 +579,17 @@ private void expectDisconnectInJoinGroup(
         }, null, true);
     }
 
+    private void expectJoinGroup(
+            String expectedMemberId,
+            int responseGeneration,
+            String responseMemberId

Review comment:
       nit: Indentation is off.

##########
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##########
@@ -30,7 +30,9 @@
   // Version 6 is the first flexible version.
   //
   // Version 7 is the same as version 6.
-  "validVersions": "0-7",
+  //
+  // Version 8 adds the Reason field (KIP-800).
+  "validVersions": "0-8",

Review comment:
       We need to bump the request version as well.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -163,6 +163,7 @@ class GroupCoordinator(val brokerId: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback,
+                      reason: String = null,

Review comment:
       I would rather use `Option[String]` here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -133,6 +133,7 @@ public boolean hasNotJoinedGroup() {
     protected final ConsumerNetworkClient client;
 
     private Node coordinator = null;
+    private String rejoinReason = "initialized abstract coordinator";

Review comment:
       The reason is a bit weird. I wonder if we could just leave it empty in the beginning.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -181,6 +182,8 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
         case Some(group) =>
           group.inLock {
+            if (reason != null)
+              info(s"memberId=$memberId with groupInstanceId=$groupInstanceId is attempting to join groupId=$groupId due to: $reason")

Review comment:
       We already log a few messages when a member (re-)joins. I wonder if we should pass the reason down and use it in all the existing messages that we already have. What do you think? I need to look at this a bit more myself.

##########
File path: clients/src/main/resources/common/message/JoinGroupRequest.json
##########
@@ -54,6 +56,9 @@
         "about": "The protocol name." },
       { "name": "Metadata", "type": "bytes", "versions": "0+",
         "about": "The protocol metadata." }
-    ]}
+    ]},
+    { "name": "Reason", "type": "string",
+      "versions": "8+", "nullableVersions": "8+","default": "null",

Review comment:
       nit: Could we put `versions` on the previous line to stay inline with the other fields? A space is missing before `default`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org