You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2020/06/24 13:36:48 UTC
[kafka] branch 2.3 updated: KAFKA-9150;
DescribeGroup uses member assignment as metadata (#8888)
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 71257a8 KAFKA-9150; DescribeGroup uses member assignment as metadata (#8888)
71257a8 is described below
commit 71257a80a901291f2064fc874cf2c7d26e45e298
Author: Dominic Evans <do...@uk.ibm.com>
AuthorDate: Wed Jun 24 14:34:30 2020 +0100
KAFKA-9150; DescribeGroup uses member assignment as metadata (#8888)
Author: David Jacot <dj...@confluent.io>
Reviewers: Jason Gustafson <ja...@confluent.io>
Closes #7658 from dajac/KAFKA-9150
(cherry picked from commit 54f8d0c3fcd9c61126e005835c4f91756cb399e5)
Signed-off-by: Dominic Evans <do...@uk.ibm.com>
Co-authored-by: David Jacot <dj...@confluent.io>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 51 ++++++++++++++++++++--
2 files changed, 49 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c883345..a873fb2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1266,7 +1266,7 @@ class KafkaApis(val requestChannel: RequestChannel,
.setClientId(member.clientId)
.setClientHost(member.clientHost)
.setMemberAssignment(member.assignment)
- .setMemberMetadata(member.assignment)
+ .setMemberMetadata(member.metadata)
}
val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index f2cdd42..c63c3b9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -20,12 +20,13 @@ package kafka.server
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
+import java.util.Random
import java.util.{Collections, Optional}
import java.util.Arrays.asList
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1}
import kafka.controller.KafkaController
-import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.network.RequestChannel
import kafka.network.RequestChannel.SendResponse
@@ -48,9 +49,9 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
-import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
+import org.apache.kafka.common.message.{DescribeGroupsRequestData, HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
-import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
import scala.collection.JavaConverters._
@@ -384,6 +385,50 @@ class KafkaApisTest {
testListOffsetFailedGetLeaderReplica(Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
+ @Test
+ def testDescribeGroups(): Unit = {
+ val groupId = "groupId"
+ val random = new Random()
+ val metadata = new Array[Byte](10)
+ random.nextBytes(metadata)
+ val assignment = new Array[Byte](10)
+ random.nextBytes(assignment)
+
+ val memberSummary = MemberSummary("memberid", Some("instanceid"), "clientid", "clienthost", metadata, assignment)
+ val groupSummary = GroupSummary("Stable", "consumer", "roundrobin", List(memberSummary))
+
+ EasyMock.reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+ val (describeGroupsRequest, request) = buildRequest(new DescribeGroupsRequest.Builder(
+ new DescribeGroupsRequestData().setGroups(List(groupId).asJava)
+ ))
+
+ val capturedResponse = expectNoThrottling()
+ EasyMock.expect(groupCoordinator.handleDescribeGroup(EasyMock.eq(groupId)))
+ .andReturn((Errors.NONE, groupSummary))
+ EasyMock.replay(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis().handleDescribeGroupRequest(request)
+
+ val response = readResponse(ApiKeys.DESCRIBE_GROUPS, describeGroupsRequest, capturedResponse)
+ .asInstanceOf[DescribeGroupsResponse]
+
+ val group = response.data().groups().get(0)
+ assertEquals(Errors.NONE, Errors.forCode(group.errorCode()))
+ assertEquals(groupId, group.groupId())
+ assertEquals(groupSummary.state, group.groupState())
+ assertEquals(groupSummary.protocolType, group.protocolType())
+ assertEquals(groupSummary.protocol, group.protocolData())
+ assertEquals(groupSummary.members.size, group.members().size())
+
+ val member = group.members().get(0)
+ assertEquals(memberSummary.memberId, member.memberId())
+ assertEquals(memberSummary.clientId, member.clientId())
+ assertEquals(memberSummary.clientHost, member.clientHost())
+ assertArrayEquals(memberSummary.metadata, member.memberMetadata())
+ assertArrayEquals(memberSummary.assignment, member.memberAssignment())
+ }
+
private def testListOffsetFailedGetLeaderReplica(error: Errors): Unit = {
val tp = new TopicPartition("foo", 0)
val isolationLevel = IsolationLevel.READ_UNCOMMITTED