You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/07/12 07:44:05 UTC

[kafka] branch 3.2 updated: KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 74e24deedb5 KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)
74e24deedb5 is described below

commit 74e24deedb549d403bb2e9d17b98dd6fbd81b0d3
Author: Eugene Tolbakov <Eu...@hyde-housing.co.uk>
AuthorDate: Tue Jul 12 08:31:16 2022 +0100

    KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)
    
    KIP-800 added the `reason` field to the JoinGroupRequest and the LeaveGroupRequest as I mean to provide more information to the group coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we discovered that the size of the field is limited to 32767 chars by our serialisation mechanism. At the moment, the field either provided directly by the user or constructed internally is directly set regardless of its length.
    
    This patch sends only the first 255 chars of the used provided or internally generated reason on the wire. Given the purpose of this field, that seems acceptable and that should still provide enough information to operators to understand the cause of a rebalance.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  3 +-
 .../consumer/internals/AbstractCoordinator.java    | 10 +++----
 .../kafka/common/requests/JoinGroupRequest.java    | 14 ++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  7 +++++
 .../internals/AbstractCoordinatorTest.java         | 32 ++++++++++++++++++++--
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cf99556b0f2..15d60a50287 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -214,6 +214,7 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -3740,7 +3741,7 @@ public class KafkaAdminClient extends AdminClient {
     public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
                                                                                RemoveMembersFromConsumerGroupOptions options) {
         String reason = options.reason() == null || options.reason().isEmpty() ?
-            DEFAULT_LEAVE_GROUP_REASON : options.reason();
+            DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason());
 
         List<MemberIdentity> members;
         if (options.removeAll()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 937ab79f2dd..5fe8a6a0e15 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -478,11 +478,11 @@ public abstract class AbstractCoordinator implements Closeable {
 
                 resetJoinGroupFuture();
                 synchronized (AbstractCoordinator.this) {
-                    final String shortReason = String.format("rebalance failed due to %s",
-                        exception.getClass().getSimpleName());
+                    final String simpleName = exception.getClass().getSimpleName();
+                    final String shortReason = String.format("rebalance failed due to %s", simpleName);
                     final String fullReason = String.format("rebalance failed due to '%s' (%s)",
                         exception.getMessage(),
-                        exception.getClass().getSimpleName());
+                        simpleName);
                     requestRejoin(shortReason, fullReason);
                 }
 
@@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         .setProtocolType(protocolType())
                         .setProtocols(metadata())
                         .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
-                        .setReason(this.rejoinReason)
+                        .setReason(JoinGroupRequest.maybeTruncateReason(this.rejoinReason))
         );
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
@@ -1111,7 +1111,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                 rebalanceConfig.groupId,
-                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
             );
 
             future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 220a59d1834..774506357bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -70,6 +70,20 @@ public class JoinGroupRequest extends AbstractRequest {
         });
     }
 
+    /**
+     * Ensures that the provided {@code reason} remains within a range of 255 chars.
+     * @param reason This is the reason that is sent to the broker over the wire
+     *               as a part of {@code JoinGroupRequest} or {@code LeaveGroupRequest} messages.
+     * @return a provided reason as is or truncated reason if it exceeds the 255 chars threshold.
+     */
+    public static String maybeTruncateReason(final String reason) {
+        if (reason.length() > 255) {
+            return reason.substring(0, 255);
+        } else {
+            return reason;
+        }
+    }
+
     public JoinGroupRequest(JoinGroupRequestData data, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = data;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c337831b072..492cb12a0be 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -3981,6 +3981,13 @@ public class KafkaAdminClientTest {
         testRemoveMembersFromGroup("testing remove members reason", "testing remove members reason");
     }
 
+    @Test
+    public void testRemoveMembersFromGroupTruncatesReason() throws Exception {
+        final String reason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong reason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        testRemoveMembersFromGroup(reason, truncatedReason);
+    }
+
     @Test
     public void testRemoveMembersFromGroupDefaultReason() throws Exception {
         testRemoveMembersFromGroup(null, DEFAULT_LEAVE_GROUP_REASON);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index c9661303b73..7cf6ee0e66c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -522,6 +522,15 @@ public class AbstractCoordinatorTest {
         expectSyncGroup(generation, memberId);
         ensureActiveGroup(generation, memberId);
         assertEquals("", coordinator.rejoinReason());
+
+        // check limit length of reason field
+        final String reason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong reason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        expectJoinGroup(memberId, truncatedReason, generation, memberId);
+        expectSyncGroup(generation, memberId);
+        coordinator.requestRejoin(reason);
+        ensureActiveGroup(generation, memberId);
+        assertEquals("", coordinator.rejoinReason());
     }
 
     private void ensureActiveGroup(
@@ -1094,6 +1103,19 @@ public class AbstractCoordinatorTest {
         assertTrue(leaveGroupFuture.succeeded());
     }
 
+    @Test
+    public void testHandleNormalLeaveGroupResponseAndTruncatedLeaveReason() {
+        MemberResponse memberResponse = new MemberResponse()
+                .setMemberId(memberId)
+                .setErrorCode(Errors.NONE.code());
+        LeaveGroupResponse response =
+                leaveGroupResponse(Collections.singletonList(memberResponse));
+        String leaveReason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong leaveReason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response, leaveReason, leaveReason.substring(0, 255));
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
     @Test
     public void testHandleMultipleMembersLeaveGroupResponse() {
         MemberResponse memberResponse = new MemberResponse()
@@ -1128,6 +1150,12 @@ public class AbstractCoordinatorTest {
     }
 
     private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) {
+        return setupLeaveGroup(leaveGroupResponse, "test maybe leave group", "test maybe leave group");
+    }
+
+    private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse,
+                                                String leaveReason,
+                                                String expectedLeaveReason) {
         setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -1139,11 +1167,11 @@ public class AbstractCoordinatorTest {
             }
             LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data();
             return leaveGroupRequest.members().get(0).memberId().equals(memberId) &&
-                   leaveGroupRequest.members().get(0).reason().equals("test maybe leave group");
+                   leaveGroupRequest.members().get(0).reason().equals(expectedLeaveReason);
         }, leaveGroupResponse);
 
         coordinator.ensureActiveGroup();
-        return coordinator.maybeLeaveGroup("test maybe leave group");
+        return coordinator.maybeLeaveGroup(leaveReason);
     }
 
     @Test