You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/03 23:40:26 UTC
[2/3] kafka git commit: KAFKA-2687: Add support for ListGroups and
DescribeGroup APIs
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
new file mode 100644
index 0000000..ddd3114
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package kafka.admin
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import kafka.common.KafkaException
+import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs}
+import org.apache.kafka.common.errors.DisconnectException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.Selector
+import org.apache.kafka.common.protocol.types.Struct
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+import org.apache.kafka.common.{TopicPartition, Cluster, Node}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class AdminClient(val time: Time,
+ val requestTimeoutMs: Int,
+ val client: ConsumerNetworkClient,
+ val bootstrapBrokers: List[Node]) extends Logging {
+
+ private def send(target: Node,
+ api: ApiKeys,
+ request: AbstractRequest): Struct = {
+ var now = time.milliseconds()
+ val deadline = now + requestTimeoutMs
+ var future: RequestFuture[ClientResponse] = null
+
+ do {
+ future = client.send(target, api, request)
+ client.poll(future)
+
+ if (future.succeeded())
+ return if (future.value().wasDisconnected()) {
+ throw new DisconnectException()
+ } else {
+ future.value().responseBody()
+ }
+
+ now = time.milliseconds()
+ } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
+
+ throw future.exception()
+ }
+
+ private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
+ bootstrapBrokers.foreach {
+ case broker =>
+ try {
+ return send(broker, api, request)
+ } catch {
+ case e: Exception =>
+ debug(s"Request ${api} failed against node ${broker}", e)
+ }
+ }
+ throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}")
+ }
+
+ private def findCoordinator(groupId: String): Node = {
+ val request = new GroupCoordinatorRequest(groupId)
+ val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
+ val response = new GroupCoordinatorResponse(responseBody)
+ Errors.forCode(response.errorCode()).maybeThrow()
+ response.node()
+ }
+
+ def listGroups(node: Node): List[GroupOverview] = {
+ val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
+ val response = new ListGroupsResponse(responseBody)
+ Errors.forCode(response.errorCode()).maybeThrow()
+ response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+ }
+
+ private def findAllBrokers(): List[Node] = {
+ val request = new MetadataRequest(List[String]())
+ val responseBody = sendAnyNode(ApiKeys.METADATA, request)
+ val response = new MetadataResponse(responseBody)
+ if (!response.errors().isEmpty)
+ debug(s"Metadata request contained errors: ${response.errors()}")
+ response.cluster().nodes().asScala.toList
+ }
+
+ def listAllGroups(): Map[Node, List[GroupOverview]] = {
+ findAllBrokers.map {
+ case broker =>
+ broker -> {
+ try {
+ listGroups(broker)
+ } catch {
+ case e: Exception =>
+ debug(s"Failed to find groups from broker ${broker}", e)
+ List[GroupOverview]()
+ }
+ }
+ }.toMap
+ }
+
+ def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = {
+ listAllGroups().mapValues { groups =>
+ groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ }
+ }
+
+ def listAllGroupsFlattened(): List[GroupOverview] = {
+ listAllGroups.values.flatten.toList
+ }
+
+ def listAllConsumerGroupsFlattened(): List[GroupOverview] = {
+ listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE)
+ }
+
+ def describeGroup(groupId: String): GroupSummary = {
+ val coordinator = findCoordinator(groupId)
+ val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
+ val response = new DescribeGroupsResponse(responseBody)
+ val metadata = response.groups().get(groupId)
+ if (metadata == null)
+ throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}")
+
+ Errors.forCode(metadata.errorCode()).maybeThrow()
+ val members = metadata.members().map {
+ case member =>
+ val metadata = Utils.readBytes(member.memberMetadata())
+ val assignment = Utils.readBytes(member.memberAssignment())
+ MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment)
+ }.toList
+ GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members)
+ }
+
+ def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = {
+ val group = describeGroup(groupId)
+ if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
+ throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group")
+
+ group.members.map {
+ case member =>
+ val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+ member.memberId -> assignment.partitions().asScala.toList
+ }.toMap
+ }
+
+}
+
+object AdminClient {
+ val DefaultConnectionMaxIdleMs = 9 * 60 * 1000
+ val DefaultRequestTimeoutMs = 5000
+ val DefaultMaxInFlightRequestsPerConnection = 100
+ val DefaultReconnectBackoffMs = 50
+ val DefaultSendBufferBytes = 128 * 1024
+ val DefaultReceiveBufferBytes = 32 * 1024
+ val DefaultRetryBackoffMs = 100
+ val AdminClientIdSequence = new AtomicInteger(1)
+ val AdminConfigDef = {
+ val config = new ConfigDef()
+ .define(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ ConfigDef.Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ ConfigDef.Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport()
+ config
+ }
+
+ class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false)
+
+ def createSimplePlaintext(brokerUrl: String): AdminClient = {
+ val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl)
+ create(new AdminConfig(config))
+ }
+
+ def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
+
+ def create(config: AdminConfig): AdminClient = {
+ val time = new SystemTime
+ val metrics = new Metrics(time)
+ val metadata = new Metadata
+ val channelBuilder = ClientUtils.createChannelBuilder(config.values())
+
+ val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+ val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
+ metadata.update(bootstrapCluster, 0)
+
+ val selector = new Selector(
+ DefaultConnectionMaxIdleMs,
+ metrics,
+ time,
+ "admin",
+ Map[String, String](),
+ channelBuilder)
+
+ val networkClient = new NetworkClient(
+ selector,
+ metadata,
+ "admin-" + AdminClientIdSequence.getAndIncrement(),
+ DefaultMaxInFlightRequestsPerConnection,
+ DefaultReconnectBackoffMs,
+ DefaultSendBufferBytes,
+ DefaultReceiveBufferBytes,
+ DefaultRequestTimeoutMs,
+ time)
+
+ val highLevelClient = new ConsumerNetworkClient(
+ networkClient,
+ metadata,
+ time,
+ DefaultRetryBackoffMs)
+
+ new AdminClient(
+ time,
+ DefaultRequestTimeoutMs,
+ highLevelClient,
+ bootstrapCluster.nodes().asScala.toList)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
new file mode 100644
index 0000000..43e78f5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupCoordinatorRequest {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer) = {
+ // envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+
+ // request
+ val group = ApiUtils.readShortString(buffer)
+ GroupCoordinatorRequest(group, versionId, correlationId, clientId)
+ }
+
+}
+
+case class GroupCoordinatorRequest(group: String,
+ versionId: Short = GroupCoordinatorRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = GroupCoordinatorRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) {
+
+ def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ ApiUtils.shortStringLength(clientId) +
+ ApiUtils.shortStringLength(group)
+
+ def writeTo(buffer: ByteBuffer) {
+ // envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ ApiUtils.writeShortString(buffer, clientId)
+
+ // consumer metadata request
+ ApiUtils.writeShortString(buffer, group)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ // return ConsumerCoordinatorNotAvailable for all uncaught errors
+ val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ }
+
+ def describe(details: Boolean) = {
+ val consumerMetadataRequest = new StringBuilder
+ consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ consumerMetadataRequest.append("; Version: " + versionId)
+ consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+ consumerMetadataRequest.append("; ClientId: " + clientId)
+ consumerMetadataRequest.append("; Group: " + group)
+ consumerMetadataRequest.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..4cd7db8
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupCoordinatorResponse {
+ val CurrentVersion = 0
+
+ private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+
+ def readFrom(buffer: ByteBuffer) = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val broker = BrokerEndPoint.readFrom(buffer)
+ val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ Some(broker)
+ else
+ None
+
+ GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId)
+ }
+
+}
+
+case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+ extends RequestOrResponse() {
+
+ def sizeInBytes =
+ 4 + /* correlationId */
+ 2 + /* error code */
+ coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+ }
+
+ def describe(details: Boolean) = toString
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
deleted file mode 100644
index 075ddb5..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object GroupMetadataRequest {
- val CurrentVersion = 0.shortValue
- val DefaultClientId = ""
-
- def readFrom(buffer: ByteBuffer) = {
- // envelope
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = ApiUtils.readShortString(buffer)
-
- // request
- val group = ApiUtils.readShortString(buffer)
- GroupMetadataRequest(group, versionId, correlationId, clientId)
- }
-
-}
-
-case class GroupMetadataRequest(group: String,
- versionId: Short = GroupMetadataRequest.CurrentVersion,
- correlationId: Int = 0,
- clientId: String = GroupMetadataRequest.DefaultClientId)
- extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
-
- def sizeInBytes =
- 2 + /* versionId */
- 4 + /* correlationId */
- ApiUtils.shortStringLength(clientId) +
- ApiUtils.shortStringLength(group)
-
- def writeTo(buffer: ByteBuffer) {
- // envelope
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- ApiUtils.writeShortString(buffer, clientId)
-
- // consumer metadata request
- ApiUtils.writeShortString(buffer, group)
- }
-
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- // return ConsumerCoordinatorNotAvailable for all uncaught errors
- val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
- }
-
- def describe(details: Boolean) = {
- val consumerMetadataRequest = new StringBuilder
- consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
- consumerMetadataRequest.append("; Version: " + versionId)
- consumerMetadataRequest.append("; CorrelationId: " + correlationId)
- consumerMetadataRequest.append("; ClientId: " + clientId)
- consumerMetadataRequest.append("; Group: " + group)
- consumerMetadataRequest.toString()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
deleted file mode 100644
index 2d65917..0000000
--- a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object GroupMetadataResponse {
- val CurrentVersion = 0
-
- private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-
- def readFrom(buffer: ByteBuffer) = {
- val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- val broker = BrokerEndPoint.readFrom(buffer)
- val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
- Some(broker)
- else
- None
-
- GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
- }
-
-}
-
-case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
- extends RequestOrResponse() {
-
- def sizeInBytes =
- 4 + /* correlationId */
- 2 + /* error code */
- coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(errorCode)
- coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
- }
-
- def describe(details: Boolean) = toString
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 669b63a..2363099 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,12 +33,17 @@ object RequestKeys {
val ControlledShutdownKey: Short = 7
val OffsetCommitKey: Short = 8
val OffsetFetchKey: Short = 9
- val GroupMetadataKey: Short = 10
+ val GroupCoordinatorKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
val LeaveGroupKey: Short = 13
val SyncGroupKey: Short = 14
+ val DescribeGroupsKey: Short = 15
+ val ListGroupsKey: Short = 16
+ // NOTE: this map only includes the server-side request/response handlers. Newer
+ // request types should only use the client-side versions which are parsed with
+ // o.a.k.common.requests.AbstractRequest.getRequest()
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
@@ -49,8 +54,7 @@ object RequestKeys {
UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
- OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
- GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
+ OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
)
def nameForKey(key: Short): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 36b5b3b..2f836c0 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkUtils)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
- queryChannel.send(GroupMetadataRequest(group))
+ queryChannel.send(GroupCoordinatorRequest(group))
val response = queryChannel.receive()
- val consumerMetadataResponse = GroupMetadataResponse.readFrom(response.payload())
+ val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload())
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 81cb51b..6f53fac 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -60,6 +60,7 @@ object ErrorMapping {
// 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
// 28: INVALID_COMMIT_OFFSET_SIZE
val AuthorizationCode: Short = 29
+ // 30: REBALANCE_IN_PROGRESS
private val exceptionToCode =
Map[Class[Throwable], Short](
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 5b1aead..e15aca4 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
TopicMetadataResponse.readFrom(response.payload())
}
- def send(request: GroupMetadataRequest): GroupMetadataResponse = {
+ def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = {
val response = sendRequest(request)
- GroupMetadataResponse.readFrom(response.payload())
+ GroupCoordinatorResponse.readFrom(response.payload())
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 97ce22b..2015371 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -107,6 +107,7 @@ class GroupCoordinator(val brokerId: Int,
def handleJoinGroup(groupId: String,
memberId: String,
clientId: String,
+ clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
@@ -132,10 +133,10 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
group = groupManager.addGroup(groupId, protocolType)
- doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
} else {
- doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
}
}
@@ -143,6 +144,7 @@ class GroupCoordinator(val brokerId: Int,
private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
+ clientHost: String,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
@@ -166,7 +168,7 @@ class GroupCoordinator(val brokerId: Int,
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -174,7 +176,7 @@ class GroupCoordinator(val brokerId: Int,
case AwaitingSync =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
@@ -201,7 +203,7 @@ class GroupCoordinator(val brokerId: Int,
case Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
- addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -269,13 +271,30 @@ class GroupCoordinator(val brokerId: Int,
group.get(memberId).awaitingSyncCallback = responseCallback
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
- // if this is the leader, then we can transition to stable and
- // propagate the assignment to any awaiting members
+ // if this is the leader, then we can attempt to persist state and transition to stable
if (memberId == group.leaderId) {
- group.transitionTo(Stable)
- // persist the group metadata and upon finish propagate the assignment
- groupManager.storeGroup(group, groupAssignment)
+ // fill any missing members with an empty assignment
+ val missing = group.allMembers -- groupAssignment.keySet
+ val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+ // persist the group metadata and upon finish transition to stable and propagate the assignment
+ groupManager.storeGroup(group, assignment, (errorCode: Short) => {
+ group synchronized {
+ // another member may have joined the group while we were awaiting this callback,
+ // so we must ensure we are still in the AwaitingSync state when it gets invoked.
+ // if we have transitioned to another state, then we shouldn't do anything
+ if (group.is(AwaitingSync)) {
+ if (errorCode != Errors.NONE.code) {
+ resetAndPropagateAssignmentError(group, errorCode)
+ maybePrepareRebalance(group)
+ } else if (group.is(AwaitingSync)) {
+ setAndPropagateAssignment(group, assignment)
+ group.transitionTo(Stable)
+ }
+ }
+ }
+ })
}
case Stable =>
@@ -413,6 +432,34 @@ class GroupCoordinator(val brokerId: Int,
}
}
+ def handleListGroups(): (Errors, List[GroupOverview]) = {
+ if (!isActive.get) {
+ (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+ } else {
+ val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
+ (errorCode, groupManager.currentGroups.map(_.overview).toList)
+ }
+ }
+
+ def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+ if (!isActive.get) {
+ (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
+ } else if (isCoordinatorLoadingInProgress(groupId)) {
+ (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+ } else {
+ val group = groupManager.getGroup(groupId)
+ if (group == null) {
+ (Errors.NONE, GroupCoordinator.DeadGroup)
+ } else {
+ group synchronized {
+ (Errors.NONE, group.summary)
+ }
+ }
+ }
+ }
+
def handleGroupImmigration(offsetTopicPartitionId: Int) = {
groupManager.loadGroupsForPartition(offsetTopicPartitionId)
}
@@ -421,6 +468,27 @@ class GroupCoordinator(val brokerId: Int,
groupManager.removeGroupsForPartition(offsetTopicPartitionId)
}
+ private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+ propagateAssignment(group, Errors.NONE.code)
+ }
+
+ private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+ propagateAssignment(group, errorCode)
+ }
+
+ private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(member.assignment, errorCode)
+ member.awaitingSyncCallback = null
+ }
+ }
+ }
+
private def validGroupId(groupId: String): Boolean = {
groupId != null && !groupId.isEmpty
}
@@ -458,12 +526,13 @@ class GroupCoordinator(val brokerId: Int,
private def addMemberAndRebalance(sessionTimeoutMs: Int,
clientId: String,
+ clientHost: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix
- val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
member.awaitingJoinCallback = callback
group.add(member.memberId, member)
maybePrepareRebalance(group)
@@ -488,11 +557,9 @@ class GroupCoordinator(val brokerId: Int,
private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
- if (group.is(AwaitingSync)) {
- groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code)
- }
+ if (group.is(AwaitingSync))
+ resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
- group.allMembers.foreach(_.assignment = null)
group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
@@ -544,7 +611,7 @@ class GroupCoordinator(val brokerId: Int,
info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
// trigger the awaiting join group response callback for all the members after rebalancing
- for (member <- group.allMembers) {
+ for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
@@ -595,6 +662,11 @@ class GroupCoordinator(val brokerId: Int,
object GroupCoordinator {
+ val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+ val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+ val NoMembers = List[MemberSummary]()
+ val NoState = ""
+ val NoProtocolType = ""
val NoProtocol = ""
val NoLeader = ""
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 652a3a4..ece9ce0 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -93,6 +93,20 @@ private object GroupMetadata {
}
/**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+ protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+ protocolType: String,
+ protocol: String,
+ members: List[MemberSummary])
+
+/**
* Group contains the following metadata:
*
* Membership metadata:
@@ -144,7 +158,9 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
- def allMembers = members.values.toList
+ def allMembers = members.keySet
+
+ def allMemberMetadata = members.values.toList
def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
timeout.max(member.sessionTimeoutMs)
@@ -168,7 +184,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
val candidates = candidateProtocols
// let each member vote for one of the protocols and choose the one with the most votes
- val votes: List[(String, Int)] = allMembers
+ val votes: List[(String, Int)] = allMemberMetadata
.map(_.vote(candidates))
.groupBy(identity)
.mapValues(_.size)
@@ -179,7 +195,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
private def candidateProtocols = {
// get the set of protocols that are commonly supported by all members
- allMembers
+ allMemberMetadata
.map(_.protocols)
.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
}
@@ -201,6 +217,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
}
+ def summary: GroupSummary = {
+ if (is(Stable)) {
+ val members = this.members.values.map{ member => member.summary(protocol) }.toList
+ GroupSummary(state.toString, protocolType, protocol, members)
+ } else {
+ val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+ GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+ }
+ }
+
+ def overview: GroupOverview = {
+ GroupOverview(groupId, protocolType)
+ }
+
private def assertValidTransition(targetState: GroupState) {
if (!GroupMetadata.validPreviousStates(targetState).contains(state))
throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 81ed548..0052b5d 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -95,12 +95,16 @@ class GroupMetadataManager(val brokerId: Int,
}
)
+ def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
+ def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty
+
/**
* Get the group associated with the given groupId, or null if not found
*/
@@ -158,7 +162,8 @@ class GroupMetadataManager(val brokerId: Int,
}
def storeGroup(group: GroupMetadata,
- groupAssignment: Map[String, Array[Byte]]) {
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: Short => Unit) {
// construct the message to append
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
@@ -208,12 +213,7 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- for (member <- group.allMembers) {
- member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte])
- }
-
- // propagate the assignments
- propagateAssignment(group, responseCode)
+ responseCallback(responseCode)
}
// call replica manager to append the group message
@@ -225,16 +225,7 @@ class GroupMetadataManager(val brokerId: Int,
putCacheCallback)
}
- def propagateAssignment(group: GroupMetadata,
- errorCode: Short) {
- val hasError = errorCode != Errors.NONE.code
- for (member <- group.allMembers) {
- if (member.awaitingSyncCallback != null) {
- member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode)
- member.awaitingSyncCallback = null
- }
- }
- }
+
/**
* Store offsets by appending it to the replicated log and then inserting to cache
@@ -657,10 +648,14 @@ object GroupMetadataManager {
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
+ new Field("client_id", STRING),
+ new Field("client_host", STRING),
new Field("session_timeout", INT32),
new Field("subscription", BYTES),
new Field("assignment", BYTES))
private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
+ private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id")
+ private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host")
private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
@@ -787,10 +782,12 @@ object GroupMetadataManager {
value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
- val memberArray = groupMetadata.allMembers.map {
+ val memberArray = groupMetadata.allMemberMetadata.map {
case memberMetadata =>
val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
+ memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
+ memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost)
memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
val metadata = memberMetadata.metadata(groupMetadata.protocol)
@@ -901,10 +898,13 @@ object GroupMetadataManager {
case memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
+ val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
+ val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
- val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription)))
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
+ List((group.protocol, subscription)))
member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 6a76241..80782c8 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -23,6 +23,13 @@ import kafka.utils.nonthreadsafe
import scala.collection.Map
+
+case class MemberSummary(memberId: String,
+ clientId: String,
+ clientHost: String,
+ metadata: Array[Byte],
+ assignment: Array[Byte])
+
/**
* Member metadata contains the following metadata:
*
@@ -46,15 +53,14 @@ import scala.collection.Map
@nonthreadsafe
private[coordinator] class MemberMetadata(val memberId: String,
val groupId: String,
+ val clientId: String,
+ val clientHost: String,
val sessionTimeoutMs: Int,
var supportedProtocols: List[(String, Array[Byte])]) {
- // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback
- // since they can be accessed in the append callback thread that does not
- // hold on the group object lock
- @volatile var assignment: Array[Byte] = null
+ var assignment: Array[Byte] = Array.empty[Byte]
var awaitingJoinCallback: JoinGroupResult => Unit = null
- @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+ var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
var latestHeartbeat: Long = -1
var isLeaving: Boolean = false
@@ -87,6 +93,14 @@ private[coordinator] class MemberMetadata(val memberId: String,
return true
}
+ def summary(protocol: String): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+ }
+
+ def summaryNoMetadata(): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+ }
+
/**
* Vote for one of the potential group protocols. This takes into account the protocol preference as
* indicated by the order of supported protocols and returns the first one also contained in the set
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
new file mode 100644
index 0000000..0e14758
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) {
+
+ def errorCode = underlying.errorCode
+
+ def coordinator: BrokerEndPoint = {
+ import kafka.javaapi.Implicits._
+ underlying.coordinatorOpt
+ }
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+ this.underlying.equals(otherConsumerMetadataResponse.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse]
+
+ override def hashCode = underlying.hashCode
+
+ override def toString = underlying.toString
+
+}
+
+object GroupCoordinatorResponse {
+ def readFrom(buffer: ByteBuffer) = new GroupCoordinatorResponse(kafka.api.GroupCoordinatorResponse.readFrom(buffer))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
deleted file mode 100644
index b94aa01..0000000
--- a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
-
- def errorCode = underlying.errorCode
-
- def coordinator: BrokerEndPoint = {
- import kafka.javaapi.Implicits._
- underlying.coordinatorOpt
- }
-
- override def equals(other: Any) = canEqual(other) && {
- val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
- this.underlying.equals(otherConsumerMetadataResponse.underlying)
- }
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
-
- override def hashCode = underlying.hashCode
-
- override def toString = underlying.toString
-
-}
-
-object GroupMetadataResponse {
- def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9fce77e..9ea4079 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -65,7 +65,8 @@ object RequestChannel extends Logging {
RequestKeys.deserializerForKey(requestId)(buffer)
else
null
- // for client-side request / response format
+ // if we failed to find a server-side mapping, then try using the
+ // client-side request / response format
val header: RequestHeader =
if (requestObj == null) {
buffer.rewind
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35c5956..0a2e0b9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.nio.ByteBuffer
+import java.util
import kafka.admin.AdminUtils
import kafka.api._
@@ -31,13 +32,12 @@ import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.Node
import scala.collection._
-
-
/**
* Logic to handle the various Kafka requests
*/
@@ -74,11 +74,13 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
- case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
+ case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
+ case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
+ case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -676,34 +678,73 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
- /*
- * Handle a consumer metadata request
- */
- def handleGroupMetadataRequest(request: RequestChannel.Request) {
- val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
+ def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
+ val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
+ val responseHeader = new ResponseHeader(request.header.correlationId)
- if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
- val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
+ val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
- val partition = coordinator.partitionFor(groupMetadataRequest.group)
+ val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head
+ val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap {
+ partitionMetadata => partitionMetadata.leader
+ }
- val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
-
- val response =
- offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
- partitionMetadata.leader.map { leader =>
- GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
- }.getOrElse(errorResponse)
- }.getOrElse(errorResponse)
+ val responseBody = coordinatorEndpoint match {
+ case None =>
+ new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode())
+ case Some(endpoint) =>
+ new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
+ }
trace("Sending consumer metadata %s for correlation id %d to client %s."
- .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ .format(responseBody, request.header.correlationId, request.header.clientId))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+ }
+
+ def handleDescribeGroupRequest(request: RequestChannel.Request) {
+ import JavaConverters._
+
+ val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest]
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+
+ val groups = describeRequest.groupIds().asScala.map {
+ case groupId =>
+ if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
+ groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
+ } else {
+ val (error, summary) = coordinator.handleDescribeGroup(groupId)
+ val members = summary.members.map { member =>
+ val metadata = ByteBuffer.wrap(member.metadata)
+ val assignment = ByteBuffer.wrap(member.assignment)
+ new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
+ }
+ groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
+ summary.protocol, members.asJava)
+ }
+ }.toMap
+
+ val responseBody = new DescribeGroupsResponse(groups.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+
+ def handleListGroupsRequest(request: RequestChannel.Request) {
+ import JavaConverters._
+
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+ val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
+ ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
+ } else {
+ val (error, groups) = coordinator.handleListGroups()
+ val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
+ new ListGroupsResponse(error.code, allGroups.asJava)
}
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -740,6 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel,
joinGroupRequest.groupId(),
joinGroupRequest.memberId(),
request.header.clientId(),
+ request.session.host,
joinGroupRequest.sessionTimeout(),
joinGroupRequest.protocolType(),
protocols,
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
new file mode 100644
index 0000000..97b49dd
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.admin.AdminClient
+import kafka.api.IntegrationTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, Logging}
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.TopicPartition
+import org.junit.{Before, Test}
+import org.junit.Assert._
+import scala.collection.JavaConversions._
+
+class AdminClientTest extends IntegrationTestHarness with Logging {
+
+ val producerCount = 1
+ val consumerCount = 2
+ val serverCount = 3
+ val groupId = "my-test"
+ val clientId = "consumer-498"
+
+ val topic = "topic"
+ val part = 0
+ val tp = new TopicPartition(topic, part)
+ val part2 = 1
+ val tp2 = new TopicPartition(topic, part2)
+
+ var client: AdminClient = null
+
+ // configure the servers and clients
+ this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
+ this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+ this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
+ this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
+ this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100")
+
+ @Before
+ override def setUp() {
+ super.setUp
+ client = AdminClient.createSimplePlaintext(this.brokerList)
+ TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
+ }
+
+ @Test
+ def testListGroups() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val groups = client.listAllGroupsFlattened
+ assertFalse(groups.isEmpty)
+ val group = groups(0)
+ assertEquals(groupId, group.groupId)
+ assertEquals("consumer", group.protocolType)
+ }
+
+ @Test
+ def testDescribeGroup() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val group= client.describeGroup(groupId)
+ assertEquals("consumer", group.protocolType)
+ assertEquals("range", group.protocol)
+ assertEquals("Stable", group.state)
+ assertFalse(group.members.isEmpty)
+
+ val member = group.members(0)
+ assertEquals(clientId, member.clientId)
+ assertFalse(member.clientHost.isEmpty)
+ assertFalse(member.memberId.isEmpty)
+ }
+
+ @Test
+ def testDescribeConsumerGroup() {
+ consumers(0).subscribe(List(topic))
+ TestUtils.waitUntilTrue(() => {
+ consumers(0).poll(0)
+ !consumers(0).assignment().isEmpty
+ }, "Expected non-empty assignment")
+
+ val assignment = client.describeConsumerGroup(groupId)
+ assertEquals(1, assignment.size)
+ for (partitions <- assignment.values)
+ assertEquals(Set(tp, tp2), partitions.toSet)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 3d484b8..e363e27 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -85,7 +85,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
- RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse],
+ RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
@@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
- RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()),
+ RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
@@ -121,7 +121,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.OffsetsKey -> TopicDescribeAcl,
RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
- RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl),
+ RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl),
RequestKeys.UpdateMetadataKey -> ClusterAcl,
RequestKeys.JoinGroupKey -> GroupReadAcl,
RequestKeys.SyncGroupKey -> GroupReadAcl,
@@ -174,7 +174,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
- RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group),
+ RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index a77979a..86e6877 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -135,8 +135,8 @@ object TestOffsetManager {
val id = random.nextInt().abs % numGroups
val group = "group-" + id
try {
- metadataChannel.send(GroupMetadataRequest(group))
- val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
+ metadataChannel.send(GroupCoordinatorRequest(group))
+ val coordinatorId = GroupCoordinatorResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
val channel = if (channels.contains(coordinatorId))
channels(coordinatorId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 09e9ce3..90f629a 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -231,12 +231,12 @@ object SerializationTestUtils {
))
}
- def createConsumerMetadataRequest: GroupMetadataRequest = {
- GroupMetadataRequest("group 1", clientId = "client 1")
+ def createConsumerMetadataRequest: GroupCoordinatorRequest = {
+ GroupCoordinatorRequest("group 1", clientId = "client 1")
}
- def createConsumerMetadataResponse: GroupMetadataResponse = {
- GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
+ def createConsumerMetadataResponse: GroupCoordinatorResponse = {
+ GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0)
}
def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = {
@@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
- private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+ private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0)
private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1)
private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5e6bd03..c1278e4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -51,10 +51,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
type LeaveGroupCallbackParams = Short
type LeaveGroupCallback = Short => Unit
+ val ClientId = "consumer-test"
+ val ClientHost = "localhost"
val ConsumerMinSessionTimeout = 10
val ConsumerMaxSessionTimeout = 1000
val DefaultSessionTimeout = 500
- var consumerCoordinator: GroupCoordinator = null
+ var groupCoordinator: GroupCoordinator = null
var replicaManager: ReplicaManager = null
var scheduler: KafkaScheduler = null
var zkUtils: ZkUtils = null
@@ -85,26 +87,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
- consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
- consumerCoordinator.startup()
+ groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+ groupCoordinator.startup()
// add the partition into the owned partition list
- groupPartitionId = consumerCoordinator.partitionFor(groupId)
- consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+ groupPartitionId = groupCoordinator.partitionFor(groupId)
+ groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
}
@After
def tearDown() {
EasyMock.reset(replicaManager)
- consumerCoordinator.shutdown()
+ groupCoordinator.shutdown()
}
@Test
def testJoinGroupWrongCoordinator() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType,
- protocols)
+ val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, protocols)
val joinGroupErrorCode = joinGroupResult.errorCode
assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode)
}
@@ -139,8 +140,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val groupId = ""
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType,
- protocols)
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode)
}
@@ -164,8 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
EasyMock.reset(replicaManager)
- val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat",
- protocols)
+ val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols)
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode)
}
@@ -285,6 +284,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
}
@Test
+ def testSyncGroupEmptyAssignment() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+ assertTrue(syncGroupResult._1.isEmpty)
+
+ EasyMock.reset(replicaManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+ }
+
+ @Test
def testSyncGroupNotCoordinator() {
val generation = 1
@@ -668,6 +688,92 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
assertEquals(Errors.NONE.code, leaveGroupResult)
}
+ @Test
+ def testListGroupsIncludesStableGroups() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ val (error, groups) = groupCoordinator.handleListGroups()
+ assertEquals(Errors.NONE, error)
+ assertEquals(1, groups.size)
+ assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ }
+
+ @Test
+ def testListGroupsIncludesRebalancingGroups() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ assertEquals(Errors.NONE.code, joinGroupResult.errorCode)
+
+ val (error, groups) = groupCoordinator.handleListGroups()
+ assertEquals(Errors.NONE, error)
+ assertEquals(1, groups.size)
+ assertEquals(GroupOverview("groupId", "consumer"), groups(0))
+ }
+
+ @Test
+ def testDescribeGroupWrongCoordinator() {
+ EasyMock.reset(replicaManager)
+ val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+ }
+
+ @Test
+ def testDescribeGroupInactiveGroup() {
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(GroupCoordinator.DeadGroup, summary)
+ }
+
+ @Test
+ def testDescribeGroupStable() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedMemberId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(protocolType, summary.protocolType)
+ assertEquals("range", summary.protocol)
+ assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
+ }
+
+ @Test
+ def testDescribeGroupRebalancing() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+ assertEquals(Errors.NONE, error)
+ assertEquals(protocolType, summary.protocolType)
+ assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
+ assertEquals(AwaitingSync.toString, summary.state)
+ assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
+ assertTrue(summary.members.forall(_.metadata.isEmpty))
+ assertTrue(summary.members.forall(_.assignment.isEmpty))
+ }
+
private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
val responsePromise = Promise[JoinGroupResult]
val responseFuture = responsePromise.future
@@ -706,7 +812,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback)
+ groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", sessionTimeout,
+ protocolType, protocols, responseCallback)
responseFuture
}
@@ -731,7 +838,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
)})
EasyMock.replay(replicaManager)
- consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+ groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
responseFuture
}
@@ -742,7 +849,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+ groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
responseFuture
}
@@ -779,7 +886,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.replay(replicaManager)
- consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+ groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
@@ -807,7 +914,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
)})
EasyMock.replay(replicaManager)
- consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+ groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
@@ -817,7 +924,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None)
EasyMock.replay(replicaManager)
- consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+ groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index 021aea6..2846622 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -146,18 +146,19 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocol() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
assertEquals("range", group.selectProtocol)
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(otherMemberId, otherMember)
@@ -165,7 +166,7 @@ class GroupMetadataTest extends JUnitSuite {
assertTrue(Set("range", "roundrobin")(group.selectProtocol))
val lastMemberId = "lastMemberId"
- val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs,
+ val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
group.add(lastMemberId, lastMember)
@@ -182,15 +183,16 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSelectProtocolChoosesCompatibleProtocol() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(memberId, member)
@@ -201,14 +203,15 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testSupportsProtocols() {
val groupId = "groupId"
-
+ val clientId = "clientId"
+ val clientHost = "clientHost"
val sessionTimeoutMs = 10000
// by default, the group supports everything
assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
val memberId = "memberId"
- val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs,
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
group.add(memberId, member)
@@ -217,7 +220,7 @@ class GroupMetadataTest extends JUnitSuite {
assertFalse(group.supportsProtocols(Set("foo", "bar")))
val otherMemberId = "otherMemberId"
- val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs,
+ val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs,
List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
group.add(otherMemberId, otherMember)