You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/07 05:09:41 UTC

[kafka] branch 2.4 updated: KAFKA-9150; DescribeGroup uses member assignment as metadata

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 975b4bd  KAFKA-9150; DescribeGroup uses member assignment as metadata
975b4bd is described below

commit 975b4bde4f6e75f07dc44fa907da28d3542f6ef4
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Nov 7 10:16:26 2019 +0530

    KAFKA-9150; DescribeGroup uses member assignment as metadata
    
    Author: David Jacot <dj...@confluent.io>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
    
    Closes #7658 from dajac/KAFKA-9150
    
    (cherry picked from commit 54f8d0c3fcd9c61126e005835c4f91756cb399e5)
    Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 52 ++++++++++++++++++++--
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f194485..abfd03a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1320,7 +1320,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setClientId(member.clientId)
             .setClientHost(member.clientHost)
             .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.assignment)
+            .setMemberMetadata(member.metadata)
         }
 
         val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 66bed52..ddc6b2f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -20,12 +20,13 @@ package kafka.server
 import java.net.InetAddress
 import java.nio.charset.StandardCharsets
 import java.util
+import java.util.Random
 import java.util.{Collections, Optional}
 import java.util.Arrays.asList
 
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
 import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.SendResponse
@@ -46,14 +47,14 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
 import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
 import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.server.authorizer.Authorizer
-import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
 
 import scala.collection.JavaConverters._
@@ -395,6 +396,51 @@ class KafkaApisTest {
   }
 
   @Test
+  def testDescribeGroups(): Unit = {
+    val groupId = "groupId"
+    val random = new Random()
+    val metadata = new Array[Byte](10)
+    random.nextBytes(metadata)
+    val assignment = new Array[Byte](10)
+    random.nextBytes(assignment)
+
+    val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
+    val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
+
+    EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val (describeGroupsRequest, request) = buildRequest(new DescribeGroupsRequest.Builder(
+      new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
+    ))
+
+    val capturedResponse = expectNoThrottling()
+    EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
+      .andReturn((Errors.NONE, groupSummary))
+    EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleDescribeGroupRequest(request)
+
+    val response = readResponse(ApiKeys.DESCRIBE_GROUPS, describeGroupsRequest, capturedResponse)
+      .asInstanceOf[DescribeGroupsResponse]
+
+    val group = response.data().groups().get(0)
+    assertEquals(Errors.NONE, Errors.forCode(group.errorCode()))
+    assertEquals(groupId, group.groupId())
+    assertEquals(groupSummary.state, group.groupState())
+    assertEquals(groupSummary.protocolType, group.protocolType())
+    assertEquals(groupSummary.protocol, group.protocolData())
+    assertEquals(groupSummary.members.size, group.members().size())
+
+    val member = group.members().get(0)
+    assertEquals(memberSummary.memberId, member.memberId())
+    assertEquals(memberSummary.groupInstanceId.orNull, member.groupInstanceId())
+    assertEquals(memberSummary.clientId, member.clientId())
+    assertEquals(memberSummary.clientHost, member.clientHost())
+    assertArrayEquals(memberSummary.metadata, member.memberMetadata())
+    assertArrayEquals(memberSummary.assignment, member.memberAssignment())
+  }
+
+  @Test
   def testOffsetDeleteWithInvalidPartition(): Unit = {
     val group = "groupId"
     val topic = "topic"