You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/11/07 05:09:41 UTC
[kafka] branch 2.4 updated: KAFKA-9150;
DescribeGroup uses member assignment as metadata
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 975b4bd KAFKA-9150; DescribeGroup uses member assignment as metadata
975b4bd is described below
commit 975b4bde4f6e75f07dc44fa907da28d3542f6ef4
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Nov 7 10:16:26 2019 +0530
KAFKA-9150; DescribeGroup uses member assignment as metadata
Author: David Jacot <dj...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #7658 from dajac/KAFKA-9150
(cherry picked from commit 54f8d0c3fcd9c61126e005835c4f91756cb399e5)
Signed-off-by: Manikumar Reddy <ma...@confluent.io>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 52 ++++++++++++++++++++--
2 files changed, 50 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f194485..abfd03a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1320,7 +1320,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setClientId(member.clientId)
.setClientHost(member.clientHost)
.setMemberAssignment(member.assignment)
- .setMemberMetadata(member.assignment)
+ .setMemberMetadata(member.metadata)
}
val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 66bed52..ddc6b2f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -20,12 +20,13 @@ package kafka.server
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
+import java.util.Random
import java.util.{Collections, Optional}
import java.util.Arrays.asList
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.network.RequestChannel.SendResponse
@@ -46,14 +47,14 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, SyncGroupRequestData, TxnOffsetCommitRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.server.authorizer.Authorizer
-import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
import scala.collection.JavaConverters._
@@ -395,6 +396,51 @@ class KafkaApisTest {
}
@Test
+ def testDescribeGroups(): Unit = {
+ val groupId = "groupId"
+ val random = new Random()
+ val metadata = new Array[Byte](10)
+ random.nextBytes(metadata)
+ val assignment = new Array[Byte](10)
+ random.nextBytes(assignment)
+
+ val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
+ val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
+
+ EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+ val (describeGroupsRequest, request) = buildRequest(new DescribeGroupsRequest.Builder(
+ new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
+ ))
+
+ val capturedResponse = expectNoThrottling()
+ EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
+ .andReturn((Errors.NONE, groupSummary))
+ EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis().handleDescribeGroupRequest(request)
+
+ val response = readResponse(ApiKeys.DESCRIBE_GROUPS, describeGroupsRequest, capturedResponse)
+ .asInstanceOf[DescribeGroupsResponse]
+
+ val group = response.data().groups().get(0)
+ assertEquals(Errors.NONE, Errors.forCode(group.errorCode()))
+ assertEquals(groupId, group.groupId())
+ assertEquals(groupSummary.state, group.groupState())
+ assertEquals(groupSummary.protocolType, group.protocolType())
+ assertEquals(groupSummary.protocol, group.protocolData())
+ assertEquals(groupSummary.members.size, group.members().size())
+
+ val member = group.members().get(0)
+ assertEquals(memberSummary.memberId, member.memberId())
+ assertEquals(memberSummary.groupInstanceId.orNull, member.groupInstanceId())
+ assertEquals(memberSummary.clientId, member.clientId())
+ assertEquals(memberSummary.clientHost, member.clientHost())
+ assertArrayEquals(memberSummary.metadata, member.memberMetadata())
+ assertArrayEquals(memberSummary.assignment, member.memberAssignment())
+ }
+
+ @Test
def testOffsetDeleteWithInvalidPartition(): Unit = {
val group = "groupId"
val topic = "topic"