You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/07/12 07:37:47 UTC

[kafka] branch 3.3 updated: KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new da055f121e7 KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)
da055f121e7 is described below

commit da055f121e7cab109ee1573fe925c453d567c576
Author: Eugene Tolbakov <Eu...@hyde-housing.co.uk>
AuthorDate: Tue Jul 12 08:31:16 2022 +0100

    KAFKA-14013: Limit the length of the `reason` field sent on the wire (#12388)
    
    KIP-800 added the `reason` field to the JoinGroupRequest and the LeaveGroupRequest as I mean to provide more information to the group coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we discovered that the size of the field is limited to 32767 chars by our serialisation mechanism. At the moment, the field either provided directly by the user or constructed internally is directly set regardless of its length.
    
    This patch sends only the first 255 chars of the used provided or internally generated reason on the wire. Given the purpose of this field, that seems acceptable and that should still provide enough information to operators to understand the cause of a rebalance.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  3 +-
 .../consumer/internals/AbstractCoordinator.java    | 10 +++----
 .../kafka/common/requests/JoinGroupRequest.java    | 14 ++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  7 +++++
 .../internals/AbstractCoordinatorTest.java         | 32 ++++++++++++++++++++--
 5 files changed, 58 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index aad2610c94a..2b2642e3518 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -218,6 +218,7 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
 import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
 import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListGroupsRequest;
 import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
@@ -3756,7 +3757,7 @@ public class KafkaAdminClient extends AdminClient {
     public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId,
                                                                                RemoveMembersFromConsumerGroupOptions options) {
         String reason = options.reason() == null || options.reason().isEmpty() ?
-            DEFAULT_LEAVE_GROUP_REASON : options.reason();
+            DEFAULT_LEAVE_GROUP_REASON : JoinGroupRequest.maybeTruncateReason(options.reason());
 
         List<MemberIdentity> members;
         if (options.removeAll()) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index fa80727d5af..c9ad797ebeb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -478,11 +478,11 @@ public abstract class AbstractCoordinator implements Closeable {
 
                 resetJoinGroupFuture();
                 synchronized (AbstractCoordinator.this) {
-                    final String shortReason = String.format("rebalance failed due to %s",
-                        exception.getClass().getSimpleName());
+                    final String simpleName = exception.getClass().getSimpleName();
+                    final String shortReason = String.format("rebalance failed due to %s", simpleName);
                     final String fullReason = String.format("rebalance failed due to '%s' (%s)",
                         exception.getMessage(),
-                        exception.getClass().getSimpleName());
+                        simpleName);
                     requestRejoin(shortReason, fullReason);
                 }
 
@@ -559,7 +559,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         .setProtocolType(protocolType())
                         .setProtocols(metadata())
                         .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
-                        .setReason(this.rejoinReason)
+                        .setReason(JoinGroupRequest.maybeTruncateReason(this.rejoinReason))
         );
 
         log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
@@ -1114,7 +1114,7 @@ public abstract class AbstractCoordinator implements Closeable {
                 generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
                 rebalanceConfig.groupId,
-                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+                Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
             );
 
             future = client.send(coordinator, request).compose(new LeaveGroupResponseHandler(generation));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 220a59d1834..774506357bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -70,6 +70,20 @@ public class JoinGroupRequest extends AbstractRequest {
         });
     }
 
+    /**
+     * Ensures that the provided {@code reason} remains within a range of 255 chars.
+     * @param reason This is the reason that is sent to the broker over the wire
+     *               as a part of {@code JoinGroupRequest} or {@code LeaveGroupRequest} messages.
+     * @return a provided reason as is or truncated reason if it exceeds the 255 chars threshold.
+     */
+    public static String maybeTruncateReason(final String reason) {
+        if (reason.length() > 255) {
+            return reason.substring(0, 255);
+        } else {
+            return reason;
+        }
+    }
+
     public JoinGroupRequest(JoinGroupRequestData data, short version) {
         super(ApiKeys.JOIN_GROUP, version);
         this.data = data;
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 523a961f964..61a2aaa00b2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4117,6 +4117,13 @@ public class KafkaAdminClientTest {
         testRemoveMembersFromGroup("testing remove members reason", "testing remove members reason");
     }
 
+    @Test
+    public void testRemoveMembersFromGroupTruncatesReason() throws Exception {
+        final String reason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong reason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        testRemoveMembersFromGroup(reason, truncatedReason);
+    }
+
     @Test
     public void testRemoveMembersFromGroupDefaultReason() throws Exception {
         testRemoveMembersFromGroup(null, DEFAULT_LEAVE_GROUP_REASON);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 6812af29ce5..ddbebb6dde6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -571,6 +571,15 @@ public class AbstractCoordinatorTest {
         expectSyncGroup(generation, memberId);
         ensureActiveGroup(generation, memberId);
         assertEquals("", coordinator.rejoinReason());
+
+        // check limit length of reason field
+        final String reason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong reason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        final String truncatedReason = reason.substring(0, 255);
+        expectJoinGroup(memberId, truncatedReason, generation, memberId);
+        expectSyncGroup(generation, memberId);
+        coordinator.requestRejoin(reason);
+        ensureActiveGroup(generation, memberId);
+        assertEquals("", coordinator.rejoinReason());
     }
 
     private void ensureActiveGroup(
@@ -1159,6 +1168,19 @@ public class AbstractCoordinatorTest {
         assertTrue(leaveGroupFuture.succeeded());
     }
 
+    @Test
+    public void testHandleNormalLeaveGroupResponseAndTruncatedLeaveReason() {
+        MemberResponse memberResponse = new MemberResponse()
+                .setMemberId(memberId)
+                .setErrorCode(Errors.NONE.code());
+        LeaveGroupResponse response =
+                leaveGroupResponse(Collections.singletonList(memberResponse));
+        String leaveReason = "Very looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong leaveReason that is 271 characters long to make sure that length limit logic handles the scenario nicely";
+        RequestFuture<Void> leaveGroupFuture = setupLeaveGroup(response, leaveReason, leaveReason.substring(0, 255));
+        assertNotNull(leaveGroupFuture);
+        assertTrue(leaveGroupFuture.succeeded());
+    }
+
     @Test
     public void testHandleMultipleMembersLeaveGroupResponse() {
         MemberResponse memberResponse = new MemberResponse()
@@ -1193,6 +1215,12 @@ public class AbstractCoordinatorTest {
     }
 
     private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) {
+        return setupLeaveGroup(leaveGroupResponse, "test maybe leave group", "test maybe leave group");
+    }
+
+    private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse leaveGroupResponse,
+                                                String leaveReason,
+                                                String expectedLeaveReason) {
         setupCoordinator(RETRY_BACKOFF_MS, Integer.MAX_VALUE, Optional.empty());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
@@ -1204,11 +1232,11 @@ public class AbstractCoordinatorTest {
             }
             LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data();
             return leaveGroupRequest.members().get(0).memberId().equals(memberId) &&
-                   leaveGroupRequest.members().get(0).reason().equals("test maybe leave group");
+                   leaveGroupRequest.members().get(0).reason().equals(expectedLeaveReason);
         }, leaveGroupResponse);
 
         coordinator.ensureActiveGroup();
-        return coordinator.maybeLeaveGroup("test maybe leave group");
+        return coordinator.maybeLeaveGroup(leaveReason);
     }
 
     @Test