You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bo...@apache.org on 2020/07/07 03:34:32 UTC

[kafka] branch 2.4 updated: KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)

This is an automated email from the ASF dual-hosted git repository.

boyang pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new b8e1cb6  KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)
b8e1cb6 is described below

commit b8e1cb6745b56f211e91b5a18e2a78f6ae967dd0
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Mon Jul 6 18:50:40 2020 -0700

    KAFKA-10239: Make GroupInstanceId ignorable in DescribeGroups (#8989)
    
    This is a bug fix for older admin clients using static membership and call DescribeGroups.
    By making groupInstanceId ignorable, it would not crash upon handling the response.
    
    Added test coverages for DescribeGroups, and some side cleanups.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../common/requests/DescribeGroupsResponse.java    |  12 +--
 .../common/message/DescribeGroupsResponse.json     |   4 +-
 .../apache/kafka/common/message/MessageTest.java   | 107 ++++++++++++++++++---
 3 files changed, 100 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index cb369a0..3de2a56 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -76,15 +76,15 @@ public class DescribeGroupsResponse extends AbstractResponse {
         final String protocol,
         final List<DescribedGroupMember> members,
         final Set<Byte> authorizedOperations) {
-        DescribedGroup groupMetada = new DescribedGroup();
-        groupMetada.setGroupId(groupId)
+        DescribedGroup groupMetadata = new DescribedGroup();
+        groupMetadata.setGroupId(groupId)
             .setErrorCode(error.code())
             .setGroupState(state)
             .setProtocolType(protocolType)
             .setProtocolData(protocol)
             .setMembers(members)
             .setAuthorizedOperations(Utils.to32BitField(authorizedOperations));
-        return  groupMetada;
+        return  groupMetadata;
     }
 
     public static DescribedGroup groupMetadata(
@@ -95,15 +95,15 @@ public class DescribeGroupsResponse extends AbstractResponse {
         final String protocol,
         final List<DescribedGroupMember> members,
         final int authorizedOperations) {
-        DescribedGroup groupMetada = new DescribedGroup();
-        groupMetada.setGroupId(groupId)
+        DescribedGroup groupMetadata = new DescribedGroup();
+        groupMetadata.setGroupId(groupId)
             .setErrorCode(error.code())
             .setGroupState(state)
             .setProtocolType(protocolType)
             .setProtocolData(protocol)
             .setMembers(members)
             .setAuthorizedOperations(authorizedOperations);
-        return  groupMetada;
+        return  groupMetadata;
     }
 
     public DescribeGroupsResponseData data() {
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index 7e8a4d9..f195843 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -23,7 +23,7 @@
   //
   // Starting in version 3, brokers can send authorized operations.
   //
-  // Starting in version 4, the response will include group.instance.id info for members.
+  // Starting in version 4, the response will optionally include group.instance.id info for members.
   //
   // Version 5 is the first flexible version.
   "validVersions": "0-5",
@@ -49,7 +49,7 @@
         "about": "The group members.", "fields": [
         { "name": "MemberId", "type": "string", "versions": "0+",
           "about": "The member ID assigned by the group coordinator." },
-        { "name": "GroupInstanceId", "type": "string", "versions": "4+",
+        { "name": "GroupInstanceId", "type": "string", "versions": "4+", "ignorable": true,
           "nullableVersions": "4+", "default": "null",
           "about": "The unique identifier of the consumer instance provided by end user." },
         { "name": "ClientId", "type": "string", "versions": "0+",
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index c7dedf9..eba5731 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.common.message;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
+import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
 import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
@@ -216,6 +218,86 @@ public final class MessageTest {
     }
 
     @Test
+    public void testDescribeGroupsRequestVersions() throws Exception {
+        testAllMessageRoundTrips(new DescribeGroupsRequestData()
+                .setGroups(Collections.singletonList("group"))
+                .setIncludeAuthorizedOperations(false));
+    }
+
+    @Test
+    public void testDescribeGroupsResponseVersions() throws Exception {
+        DescribedGroupMember baseMember = new DescribedGroupMember()
+            .setMemberId(memberId);
+
+        DescribedGroup baseGroup = new DescribedGroup()
+                                       .setGroupId("group")
+                                       .setGroupState("Stable").setErrorCode(Errors.NONE.code())
+                                       .setMembers(Collections.singletonList(baseMember))
+                                       .setProtocolType("consumer");
+        DescribeGroupsResponseData baseResponse = new DescribeGroupsResponseData()
+                                                      .setGroups(Collections.singletonList(baseGroup));
+        testAllMessageRoundTrips(baseResponse);
+
+        testAllMessageRoundTripsFromVersion((short) 1, baseResponse.setThrottleTimeMs(10));
+
+        baseGroup.setAuthorizedOperations(1);
+        testAllMessageRoundTripsFromVersion((short) 3, baseResponse);
+
+        baseMember.setGroupInstanceId(instanceId);
+        testAllMessageRoundTripsFromVersion((short) 4, baseResponse);
+    }
+
+    @Test
+    public void testGroupInstanceIdIgnorableInDescribeGroupsResponse() throws Exception {
+        DescribeGroupsResponseData responseWithGroupInstanceId =
+            new DescribeGroupsResponseData()
+                .setGroups(Collections.singletonList(
+                    new DescribedGroup()
+                        .setGroupId("group")
+                        .setGroupState("Stable")
+                        .setErrorCode(Errors.NONE.code())
+                        .setMembers(Collections.singletonList(
+                            new DescribedGroupMember()
+                                .setMemberId(memberId)
+                                .setGroupInstanceId(instanceId)))
+                        .setProtocolType("consumer")
+                ));
+
+        DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData(
+            responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()),
+            responseWithGroupInstanceId.highestSupportedVersion());
+        // Unset GroupInstanceId
+        expectedResponse.groups().get(0).members().get(0).setGroupInstanceId(null);
+
+        testAllMessageRoundTripsBeforeVersion((short) 4, responseWithGroupInstanceId, expectedResponse);
+    }
+
+    @Test
+    public void testThrottleTimeIgnorableInDescribeGroupsResponse() throws Exception {
+        DescribeGroupsResponseData responseWithGroupInstanceId =
+            new DescribeGroupsResponseData()
+                .setGroups(Collections.singletonList(
+                    new DescribedGroup()
+                        .setGroupId("group")
+                        .setGroupState("Stable")
+                        .setErrorCode(Errors.NONE.code())
+                        .setMembers(Collections.singletonList(
+                            new DescribedGroupMember()
+                                .setMemberId(memberId)))
+                        .setProtocolType("consumer")
+                ))
+                .setThrottleTimeMs(10);
+
+        DescribeGroupsResponseData expectedResponse = new DescribeGroupsResponseData(
+            responseWithGroupInstanceId.toStruct(responseWithGroupInstanceId.highestSupportedVersion()),
+            responseWithGroupInstanceId.highestSupportedVersion());
+        // Unset throttle time
+        expectedResponse.setThrottleTimeMs(0);
+
+        testAllMessageRoundTripsBeforeVersion((short) 1, responseWithGroupInstanceId, expectedResponse);
+    }
+
+    @Test
     public void testOffsetForLeaderEpochVersions() throws Exception {
         // Version 2 adds optional current leader epoch
         OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataNoCurrentEpoch =
@@ -244,7 +326,6 @@ public final class MessageTest {
         testAllMessageRoundTripsBeforeVersion((short) 3,
                 new OffsetForLeaderEpochRequestData().setReplicaId(5),
                 new OffsetForLeaderEpochRequestData().setReplicaId(-2));
-
     }
 
     @Test
@@ -672,7 +753,7 @@ public final class MessageTest {
      * Test that the JSON request files match the schemas accessible through the ApiKey class.
      */
     @Test
-    public void testRequestSchemas() throws Exception {
+    public void testRequestSchemas() {
         for (ApiKeys apiKey : ApiKeys.values()) {
             Schema[] manualSchemas = apiKey.requestSchemas;
             Schema[] generatedSchemas = ApiMessageType.fromApiKey(apiKey.id).requestSchemas();
@@ -728,13 +809,9 @@ public final class MessageTest {
                 return true;
             }
             if (type.getClass().equals(Type.RECORDS.getClass())) {
-                if (other.type.getClass().equals(Type.NULLABLE_BYTES.getClass())) {
-                    return true;
-                }
+                return other.type.getClass().equals(Type.NULLABLE_BYTES.getClass());
             } else if (type.getClass().equals(Type.NULLABLE_BYTES.getClass())) {
-                if (other.type.getClass().equals(Type.RECORDS.getClass())) {
-                    return true;
-                }
+                return other.type.getClass().equals(Type.RECORDS.getClass());
             }
             return false;
         }
@@ -799,7 +876,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testDefaultValues() throws Exception {
+    public void testDefaultValues() {
         verifyWriteRaisesUve((short) 0, "validateOnly",
             new CreateTopicsRequestData().setValidateOnly(true));
         verifyWriteSucceeds((short) 0,
@@ -812,7 +889,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testNonIgnorableFieldWithDefaultNull() throws Exception {
+    public void testNonIgnorableFieldWithDefaultNull() {
         // Test non-ignorable string field `groupInstanceId` with default null
         verifyWriteRaisesUve((short) 0, "groupInstanceId", new HeartbeatRequestData()
                 .setGroupId("groupId")
@@ -831,7 +908,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testWriteNullForNonNullableFieldRaisesException() throws Exception {
+    public void testWriteNullForNonNullableFieldRaisesException() {
         CreateTopicsRequestData createTopics = new CreateTopicsRequestData().setTopics(null);
         for (short i = (short) 0; i <= createTopics.highestSupportedVersion(); i++) {
             verifyWriteRaisesNpe(i, createTopics);
@@ -841,7 +918,7 @@ public final class MessageTest {
     }
 
     @Test
-    public void testUnknownTaggedFields() throws Exception {
+    public void testUnknownTaggedFields() {
         CreateTopicsRequestData createTopics = new CreateTopicsRequestData();
         verifyWriteSucceeds((short) 6, createTopics);
         RawTaggedField field1000 = new RawTaggedField(1000, new byte[] {0x1, 0x2, 0x3});
@@ -850,7 +927,7 @@ public final class MessageTest {
         verifyWriteSucceeds((short) 6, createTopics);
     }
 
-    private void verifyWriteRaisesNpe(short version, Message message) throws Exception {
+    private void verifyWriteRaisesNpe(short version, Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         assertThrows(NullPointerException.class, () -> {
             int size = message.size(cache, version);
@@ -862,7 +939,7 @@ public final class MessageTest {
 
     private void verifyWriteRaisesUve(short version,
                                       String problemText,
-                                     Message message) throws Exception {
+                                      Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         UnsupportedVersionException e =
             assertThrows(UnsupportedVersionException.class, () -> {
@@ -876,7 +953,7 @@ public final class MessageTest {
                 e.getMessage().contains(problemText));
     }
 
-    private void verifyWriteSucceeds(short version, Message message) throws Exception {
+    private void verifyWriteSucceeds(short version, Message message) {
         ObjectSerializationCache cache = new ObjectSerializationCache();
         int size = message.size(cache, version);
         ByteBuffer buf = ByteBuffer.allocate(size * 2);