You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/06/24 13:36:48 UTC

[kafka] branch 2.3 updated: KAFKA-9150; DescribeGroup uses member assignment as metadata (#8888)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 71257a8  KAFKA-9150; DescribeGroup uses member assignment as metadata (#8888)
71257a8 is described below

commit 71257a80a901291f2064fc874cf2c7d26e45e298
Author: Dominic Evans <do...@uk.ibm.com>
AuthorDate: Wed Jun 24 14:34:30 2020 +0100

    KAFKA-9150; DescribeGroup uses member assignment as metadata (#8888)
    
    Author: David Jacot <dj...@confluent.io>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
    
    Closes #7658 from dajac/KAFKA-9150
    
    (cherry picked from commit 54f8d0c3fcd9c61126e005835c4f91756cb399e5)
    Signed-off-by: Dominic Evans <do...@uk.ibm.com>
    
    Co-authored-by: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 51 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c883345..a873fb2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1266,7 +1266,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setClientId(member.clientId)
             .setClientHost(member.clientHost)
             .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.assignment)
+            .setMemberMetadata(member.metadata)
         }
 
         val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f2cdd42..c63c3b9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -20,12 +20,13 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util
+import java.util.Random
 import java.util.{Collections, Optional}
 import java.util.Arrays.asList
 
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
 import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.SendResponse
@@ -48,9 +49,9 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
-import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
 import scala.collection.JavaConverters._
@@ -384,6 +385,50 @@ class KafkaApisTest {
     testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   }
 
+  @Test
+  def testDescribeGroups(): Unit = {
+    val groupId = "groupId"
+    val random = new Random()
+    val metadata = new Array[Byte](10)
+    random.nextBytes(metadata)
+    val assignment = new Array[Byte](10)
+    random.nextBytes(assignment)
+
+    val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
+    val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
+
+    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val (describeGroupsRequest, request) = buildRequest(new DescribeGroupsRequest.Builder(
+      new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
+    ))
+
+    val capturedResponse = expectNoThrottling()
+    EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
+      .andReturn((Errors.NONE, groupSummary))
+    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleDescribeGroupRequest(request)
+
+    val response = readResponse(ApiKeys.DESCRIBE_GROUPS, describeGroupsRequest, capturedResponse)
+      .asInstanceOf[DescribeGroupsResponse]
+
+    val group = response.data().groups().get(0)
+    assertEquals(Errors.NONE, Errors.forCode(group.errorCode()))
+    assertEquals(groupId, group.groupId())
+    assertEquals(groupSummary.state, group.groupState())
+    assertEquals(groupSummary.protocolType, group.protocolType())
+    assertEquals(groupSummary.protocol, group.protocolData())
+    assertEquals(groupSummary.members.size, group.members().size())
+
+    val member = group.members().get(0)
+    assertEquals(memberSummary.memberId, member.memberId())
+    assertEquals(memberSummary.clientId, member.clientId())
+    assertEquals(memberSummary.clientHost, member.clientHost())
+    assertArrayEquals(memberSummary.metadata, member.memberMetadata())
+    assertArrayEquals(memberSummary.assignment, member.memberAssignment())
+  }
+
   private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = {
     val tp = new TopicPartition("foo", 0)
     val isolationLevel = IsolationLevel.READ_UNCOMMITTED