You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:48 UTC
[4/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
deleted file mode 100644
index ea1c0d0..0000000
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-
-object ConsumerMetadataResponse {
- val CurrentVersion = 0
-
- private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
-
- def readFrom(buffer: ByteBuffer) = {
- val correlationId = buffer.getInt
- val errorCode = buffer.getShort
- val broker = BrokerEndPoint.readFrom(buffer)
- val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
- Some(broker)
- else
- None
-
- ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
- }
-
-}
-
-case class ConsumerMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
- extends RequestOrResponse() {
-
- def sizeInBytes =
- 4 + /* correlationId */
- 2 + /* error code */
- coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
-
- def writeTo(buffer: ByteBuffer) {
- buffer.putInt(correlationId)
- buffer.putShort(errorCode)
- coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
- }
-
- def describe(details: Boolean) = toString
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
new file mode 100644
index 0000000..075ddb5
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupMetadataRequest.scala
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.common.ErrorMapping
+import kafka.network.{RequestOrResponseSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+
+object GroupMetadataRequest {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer) = {
+ // envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+
+ // request
+ val group = ApiUtils.readShortString(buffer)
+ GroupMetadataRequest(group, versionId, correlationId, clientId)
+ }
+
+}
+
+case class GroupMetadataRequest(group: String,
+ versionId: Short = GroupMetadataRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = GroupMetadataRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) {
+
+ def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ ApiUtils.shortStringLength(clientId) +
+ ApiUtils.shortStringLength(group)
+
+ def writeTo(buffer: ByteBuffer) {
+ // envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ ApiUtils.writeShortString(buffer, clientId)
+
+ // consumer metadata request
+ ApiUtils.writeShortString(buffer, group)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ // return ConsumerCoordinatorNotAvailable for all uncaught errors
+ val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
+ requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
+ }
+
+ def describe(details: Boolean) = {
+ val consumerMetadataRequest = new StringBuilder
+ consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ consumerMetadataRequest.append("; Version: " + versionId)
+ consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+ consumerMetadataRequest.append("; ClientId: " + clientId)
+ consumerMetadataRequest.append("; Group: " + group)
+ consumerMetadataRequest.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
new file mode 100644
index 0000000..2d65917
--- /dev/null
+++ b/core/src/main/scala/kafka/api/GroupMetadataResponse.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+import kafka.common.ErrorMapping
+
+object GroupMetadataResponse {
+ val CurrentVersion = 0
+
+ private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1))
+
+ def readFrom(buffer: ByteBuffer) = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val broker = BrokerEndPoint.readFrom(buffer)
+ val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ Some(broker)
+ else
+ None
+
+ GroupMetadataResponse(coordinatorOpt, errorCode, correlationId)
+ }
+
+}
+
+case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int)
+ extends RequestOrResponse() {
+
+ def sizeInBytes =
+ 4 + /* correlationId */
+ 2 + /* error code */
+ coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer))
+ }
+
+ def describe(details: Boolean) = toString
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 5b362ef..75067cf 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -41,7 +41,7 @@ object OffsetCommitRequest extends Logging {
val clientId = readShortString(buffer)
// Read the OffsetRequest
- val consumerGroupId = readShortString(buffer)
+ val groupId = readShortString(buffer)
// version 1 and 2 specific fields
val groupGenerationId: Int =
@@ -50,11 +50,11 @@ object OffsetCommitRequest extends Logging {
else
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
- val consumerId: String =
+ val memberId: String =
if (versionId >= 1)
readShortString(buffer)
else
- org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
+ org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID
// version 2 specific fields
val retentionMs: Long =
@@ -83,7 +83,7 @@ object OffsetCommitRequest extends Logging {
})
})
- OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs)
+ OffsetCommitRequest(groupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, memberId, retentionMs)
}
}
@@ -93,7 +93,7 @@ case class OffsetCommitRequest(groupId: String,
correlationId: Int = 0,
clientId: String = OffsetCommitRequest.DefaultClientId,
groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
- consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID,
+ memberId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID,
retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
@@ -114,7 +114,7 @@ case class OffsetCommitRequest(groupId: String,
// version 1 and 2 specific data
if (versionId >= 1) {
buffer.putInt(groupGenerationId)
- writeShortString(buffer, consumerId)
+ writeShortString(buffer, memberId)
}
// version 2 or above specific data
@@ -142,7 +142,7 @@ case class OffsetCommitRequest(groupId: String,
4 + /* correlationId */
shortStringLength(clientId) +
shortStringLength(groupId) +
- (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) +
+ (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(memberId) else 0) +
(if (versionId >= 2) 8 /* retention time */ else 0) +
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
@@ -175,7 +175,7 @@ case class OffsetCommitRequest(groupId: String,
offsetCommitRequest.append("; ClientId: " + clientId)
offsetCommitRequest.append("; GroupId: " + groupId)
offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId)
- offsetCommitRequest.append("; ConsumerId: " + consumerId)
+ offsetCommitRequest.append("; MemberId: " + memberId)
offsetCommitRequest.append("; RetentionMs: " + retentionMs)
if(details)
offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(","))
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index 8a22c1a..669b63a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -33,10 +33,11 @@ object RequestKeys {
val ControlledShutdownKey: Short = 7
val OffsetCommitKey: Short = 8
val OffsetFetchKey: Short = 9
- val ConsumerMetadataKey: Short = 10
+ val GroupMetadataKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
val LeaveGroupKey: Short = 13
+ val SyncGroupKey: Short = 14
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -49,7 +50,7 @@ object RequestKeys {
ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
- ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom)
+ GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom)
)
def nameForKey(key: Short): String = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 6ae0347..36b5b3b 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -151,9 +151,9 @@ object ClientUtils extends Logging{
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkUtils)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
- queryChannel.send(ConsumerMetadataRequest(group))
+ queryChannel.send(GroupMetadataRequest(group))
val response = queryChannel.receive()
- val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.payload())
+ val consumerMetadataResponse = GroupMetadataResponse.readFrom(response.payload())
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index deb48b1..bbee894 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -64,8 +64,9 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
object OffsetMetadataAndError {
val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code)
- val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code)
- val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
+ val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code)
+ val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index db75d4b..ca41eba 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,7 +18,7 @@
package kafka.common
import util.matching.Regex
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
object Topic {
@@ -26,7 +26,7 @@ object Topic {
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
- val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName)
+ val InternalTopics = Set(GroupCoordinator.OffsetsTopicName)
def validate(topic: String) {
if (topic.length <= 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index b7af6d6..5b1aead 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -112,9 +112,9 @@ class SimpleConsumer(val host: String,
TopicMetadataResponse.readFrom(response.payload())
}
- def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
+ def send(request: GroupMetadataRequest): GroupMetadataResponse = {
val response = sendRequest(request)
- ConsumerMetadataResponse.readFrom(response.payload())
+ GroupMetadataResponse.readFrom(response.payload())
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
deleted file mode 100644
index bf23e9b..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator
-
-import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
-import kafka.message.UncompressedCodec
-import kafka.log.LogConfig
-import kafka.server._
-import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
-
-import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.Properties
-import scala.collection.{Map, Seq, immutable}
-
-case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int,
- consumerMaxSessionTimeoutMs: Int)
-
-/**
- * ConsumerCoordinator handles consumer group and consumer offset management.
- *
- * Each Kafka server instantiates a coordinator which is responsible for a set of
- * consumer groups. Consumer groups are assigned to coordinators based on their
- * group names.
- */
-class ConsumerCoordinator(val brokerId: Int,
- val groupConfig: GroupManagerConfig,
- val offsetConfig: OffsetManagerConfig,
- private val offsetManager: OffsetManager,
- zkUtils: ZkUtils) extends Logging {
-
- this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
-
- private val isActive = new AtomicBoolean(false)
-
- private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
- private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
- private var coordinatorMetadata: CoordinatorMetadata = null
-
- def this(brokerId: Int,
- groupConfig: GroupManagerConfig,
- offsetConfig: OffsetManagerConfig,
- replicaManager: ReplicaManager,
- zkUtils: ZkUtils,
- scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
- new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils)
-
- def offsetsTopicConfigs: Properties = {
- val props = new Properties
- props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
- props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
- props
- }
-
- /**
- * NOTE: If a group lock and metadataLock are simultaneously needed,
- * be sure to acquire the group lock before metadataLock to prevent deadlock
- */
-
- /**
- * Startup logic executed at the same time when the server starts up.
- */
- def startup() {
- info("Starting up.")
- heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
- rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
- coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance)
- isActive.set(true)
- info("Startup complete.")
- }
-
- /**
- * Shutdown logic executed at the same time when server shuts down.
- * Ordering of actions should be reversed from the startup process.
- */
- def shutdown() {
- info("Shutting down.")
- isActive.set(false)
- offsetManager.shutdown()
- coordinatorMetadata.shutdown()
- heartbeatPurgatory.shutdown()
- rebalancePurgatory.shutdown()
- info("Shutdown complete.")
- }
-
- def handleJoinGroup(groupId: String,
- consumerId: String,
- topics: Set[String],
- sessionTimeoutMs: Int,
- partitionAssignmentStrategy: String,
- responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
- if (!isActive.get) {
- responseCallback(Set.empty, consumerId, 0, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
- } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
- responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
- } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs ||
- sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) {
- responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
- } else {
- // only try to create the group if the group is not unknown AND
- // the consumer id is UNKNOWN, if consumer is specified but group does not
- // exist we should reject the request
- var group = coordinatorMetadata.getGroup(groupId)
- if (group == null) {
- if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
- responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
- } else {
- group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
- doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback)
- }
- } else {
- doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy, responseCallback)
- }
- }
- }
-
- private def doJoinGroup(group: ConsumerGroupMetadata,
- consumerId: String,
- topics: Set[String],
- sessionTimeoutMs: Int,
- partitionAssignmentStrategy: String,
- responseCallback:(Set[TopicAndPartition], String, Int, Short) => Unit) {
- group synchronized {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
- responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) {
- responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code)
- } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) {
- // if the consumer trying to register with a un-recognized id, send the response to let
- // it reset its consumer id and retry
- responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics) {
- /*
- * if an existing consumer sends a JoinGroupRequest with no changes while the group is stable,
- * just treat it like a heartbeat and return their currently assigned partitions.
- */
- val consumer = group.get(consumerId)
- completeAndScheduleNextHeartbeatExpiration(group, consumer)
- responseCallback(consumer.assignedTopicPartitions, consumerId, group.generationId, Errors.NONE.code)
- } else {
- val consumer = if (consumerId == JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
- // if the consumer id is unknown, register this consumer to the group
- val generatedConsumerId = group.generateNextConsumerId
- val consumer = addConsumer(generatedConsumerId, topics, sessionTimeoutMs, group)
- maybePrepareRebalance(group)
- consumer
- } else {
- val consumer = group.get(consumerId)
- if (topics != consumer.topics) {
- // existing consumer changed its subscribed topics
- updateConsumer(group, consumer, topics)
- maybePrepareRebalance(group)
- consumer
- } else {
- // existing consumer rejoining a group due to rebalance
- consumer
- }
- }
-
- consumer.awaitingRebalanceCallback = responseCallback
-
- if (group.is(PreparingRebalance))
- rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
- }
- }
- }
-
- def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
- if (!isActive.get) {
- responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
- } else {
- val group = coordinatorMetadata.getGroup(groupId)
- if (group == null) {
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else {
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (!group.has(consumerId)) {
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else {
- val consumer = group.get(consumerId)
- removeHeartbeatForLeavingConsumer(group, consumer)
- onConsumerFailure(group, consumer)
- responseCallback(Errors.NONE.code)
- if (group.is(PreparingRebalance))
- rebalancePurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
- }
- }
- }
- }
- }
-
- def handleHeartbeat(groupId: String,
- consumerId: String,
- generationId: Int,
- responseCallback: Short => Unit) {
- if (!isActive.get) {
- responseCallback(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
- } else {
- val group = coordinatorMetadata.getGroup(groupId)
- if (group == null) {
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else {
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (!group.has(consumerId)) {
- responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
- } else if (generationId != group.generationId) {
- responseCallback(Errors.ILLEGAL_GENERATION.code)
- } else if (!group.is(Stable)) {
- responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
- } else {
- val consumer = group.get(consumerId)
- completeAndScheduleNextHeartbeatExpiration(group, consumer)
- responseCallback(Errors.NONE.code)
- }
- }
- }
- }
- }
-
- def handleCommitOffsets(groupId: String,
- consumerId: String,
- generationId: Int,
- offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
- if (!isActive.get) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code))
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code))
- } else {
- val group = coordinatorMetadata.getGroup(groupId)
- if (group == null) {
- if (generationId < 0)
- // the group is not relying on Kafka for partition management, so allow the commit
- offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
- else
- // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
- // or this is a request coming from an older generation. either way, reject the commit
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
- } else {
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
- } else if (!group.has(consumerId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
- } else if (generationId != group.generationId) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
- } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code))
- } else {
- offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
- }
- }
- }
- }
- }
-
- def handleFetchOffsets(groupId: String,
- partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
- if (!isActive.get) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
- } else if (!isCoordinatorForGroup(groupId)) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
- } else {
- val group = coordinatorMetadata.getGroup(groupId)
- if (group == null) {
- // if the group does not exist, it means this group is not relying
- // on Kafka for partition management, and hence never send join-group
- // request to the coordinator before; in this case blindly fetch the offsets
- offsetManager.getOffsets(groupId, partitions)
- } else {
- group synchronized {
- if (group.is(Dead)) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap
- } else {
- offsetManager.getOffsets(groupId, partitions)
- }
- }
- }
- }
- }
-
- def handleGroupImmigration(offsetTopicPartitionId: Int) = {
- // TODO we may need to add more logic in KAFKA-2017
- offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
- }
-
- def handleGroupEmigration(offsetTopicPartitionId: Int) = {
- // TODO we may need to add more logic in KAFKA-2017
- offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
- }
-
- /**
- * Complete existing DelayedHeartbeats for the given consumer and schedule the next one
- */
- private def completeAndScheduleNextHeartbeatExpiration(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
- // complete current heartbeat expectation
- consumer.latestHeartbeat = SystemTime.milliseconds
- val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
- heartbeatPurgatory.checkAndComplete(consumerKey)
-
- // reschedule the next heartbeat expiration deadline
- val newHeartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs
- val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer, newHeartbeatDeadline, consumer.sessionTimeoutMs)
- heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
- }
-
- private def removeHeartbeatForLeavingConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
- consumer.isLeaving = true
- val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
- heartbeatPurgatory.checkAndComplete(consumerKey)
- }
-
- private def addConsumer(consumerId: String,
- topics: Set[String],
- sessionTimeoutMs: Int,
- group: ConsumerGroupMetadata) = {
- val consumer = new ConsumerMetadata(consumerId, group.groupId, topics, sessionTimeoutMs)
- val topicsToBind = topics -- group.topics
- group.add(consumer.consumerId, consumer)
- coordinatorMetadata.bindGroupToTopics(group.groupId, topicsToBind)
- consumer
- }
-
- private def removeConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
- group.remove(consumer.consumerId)
- val topicsToUnbind = consumer.topics -- group.topics
- coordinatorMetadata.unbindGroupFromTopics(group.groupId, topicsToUnbind)
- }
-
- private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) {
- val topicsToBind = topics -- group.topics
- group.remove(consumer.consumerId)
- val topicsToUnbind = consumer.topics -- (group.topics ++ topics)
- group.add(consumer.consumerId, consumer)
- consumer.topics = topics
- coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind)
- }
-
- private def maybePrepareRebalance(group: ConsumerGroupMetadata) {
- group synchronized {
- if (group.canRebalance)
- prepareRebalance(group)
- }
- }
-
- private def prepareRebalance(group: ConsumerGroupMetadata) {
- group.transitionTo(PreparingRebalance)
- info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId))
-
- val rebalanceTimeout = group.rebalanceTimeout
- val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout)
- val consumerGroupKey = ConsumerGroupKey(group.groupId)
- rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
- }
-
- private def rebalance(group: ConsumerGroupMetadata) {
- assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
-
- group.transitionTo(Rebalancing)
- group.generationId += 1
-
- info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId))
-
- val assignedPartitionsPerConsumer = reassignPartitions(group)
- trace("Rebalance for group %s generation %s has assigned partitions: %s"
- .format(group.groupId, group.generationId, assignedPartitionsPerConsumer))
-
- group.transitionTo(Stable)
- info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
- }
-
- private def onConsumerFailure(group: ConsumerGroupMetadata, consumer: ConsumerMetadata) {
- trace("Consumer %s in group %s has failed".format(consumer.consumerId, group.groupId))
- removeConsumer(group, consumer)
- maybePrepareRebalance(group)
- }
-
- private def reassignPartitions(group: ConsumerGroupMetadata) = {
- val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy)
- val topicsPerConsumer = group.topicsPerConsumer
- val partitionsPerTopic = coordinatorMetadata.partitionsPerTopic
- val assignedPartitionsPerConsumer = assignor.assign(topicsPerConsumer, partitionsPerTopic)
- assignedPartitionsPerConsumer.foreach { case (consumerId, partitions) =>
- group.get(consumerId).assignedTopicPartitions = partitions
- }
- assignedPartitionsPerConsumer
- }
-
- def tryCompleteRebalance(group: ConsumerGroupMetadata, forceComplete: () => Boolean) = {
- group synchronized {
- if (group.notYetRejoinedConsumers.isEmpty)
- forceComplete()
- else false
- }
- }
-
- def onExpirationRebalance() {
- // TODO: add metrics for rebalance timeouts
- }
-
- def onCompleteRebalance(group: ConsumerGroupMetadata) {
- group synchronized {
- val failedConsumers = group.notYetRejoinedConsumers
- if (group.isEmpty || !failedConsumers.isEmpty) {
- failedConsumers.foreach { failedConsumer =>
- removeConsumer(group, failedConsumer)
- // TODO: cut the socket connection to the consumer
- }
-
- if (group.isEmpty) {
- group.transitionTo(Dead)
- info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
- coordinatorMetadata.removeGroup(group.groupId, group.topics)
- }
- }
- if (!group.is(Dead)) {
- // assign partitions to existing consumers of the group according to the partitioning strategy
- rebalance(group)
-
- // trigger the awaiting join group response callback for all the consumers after rebalancing
- for (consumer <- group.allConsumers) {
- assert(consumer.awaitingRebalanceCallback != null)
- consumer.awaitingRebalanceCallback(consumer.assignedTopicPartitions, consumer.consumerId, group.generationId, Errors.NONE.code)
- consumer.awaitingRebalanceCallback = null
- completeAndScheduleNextHeartbeatExpiration(group, consumer)
- }
- }
- }
- }
-
- def tryCompleteHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
- group synchronized {
- if (shouldKeepConsumerAlive(consumer, heartbeatDeadline) || consumer.isLeaving)
- forceComplete()
- else false
- }
- }
-
- def onExpirationHeartbeat(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, heartbeatDeadline: Long) {
- group synchronized {
- if (!shouldKeepConsumerAlive(consumer, heartbeatDeadline))
- onConsumerFailure(group, consumer)
- }
- }
-
- def onCompleteHeartbeat() {
- // TODO: add metrics for complete heartbeats
- }
-
- def partitionFor(group: String): Int = offsetManager.partitionFor(group)
-
- private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) =
- consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline
-
- private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
-}
-
-object ConsumerCoordinator {
-
- val OffsetsTopicName = "__consumer_offsets"
-
- def create(config: KafkaConfig,
- zkUtils: ZkUtils,
- replicaManager: ReplicaManager,
- kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
- val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
- val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
- consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
-
- new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
- }
-
- def create(config: KafkaConfig,
- zkUtils: ZkUtils,
- offsetManager: OffsetManager): ConsumerCoordinator = {
- val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
- val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
- consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
-
- new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils)
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
deleted file mode 100644
index 0e3657f..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.utils.nonthreadsafe
-
-import java.util.UUID
-
-import collection.mutable
-
-private[coordinator] sealed trait GroupState { def state: Byte }
-
-/**
- * Consumer group is preparing to rebalance
- *
- * action: respond to heartbeats with an ILLEGAL GENERATION error code
- * transition: some consumers have joined by the timeout => Rebalancing
- * all consumers have left the group => Dead
- */
-private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
-
-/**
- * Consumer group is rebalancing
- *
- * action: compute the group's partition assignment
- * send the join-group response with new partition assignment when rebalance is complete
- * transition: partition assignment has been computed => Stable
- */
-private[coordinator] case object Rebalancing extends GroupState { val state: Byte = 2 }
-
-/**
- * Consumer group is stable
- *
- * action: respond to consumer heartbeats normally
- * transition: consumer failure detected via heartbeat => PreparingRebalance
- * consumer join-group received => PreparingRebalance
- * zookeeper topic watcher fired => PreparingRebalance
- */
-private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
-
-/**
- * Consumer group has no more members
- *
- * action: none
- * transition: none
- */
-private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
-
-
-private object ConsumerGroupMetadata {
- private val validPreviousStates: Map[GroupState, Set[GroupState]] =
- Map(Dead -> Set(PreparingRebalance),
- Stable -> Set(Rebalancing),
- PreparingRebalance -> Set(Stable),
- Rebalancing -> Set(PreparingRebalance))
-}
-
-/**
- * Group contains the following metadata:
- *
- * Membership metadata:
- * 1. Consumers registered in this group
- * 2. Partition assignment strategy for this group
- *
- * State metadata:
- * 1. group state
- * 2. generation id
- */
-@nonthreadsafe
-private[coordinator] class ConsumerGroupMetadata(val groupId: String,
- val partitionAssignmentStrategy: String) {
-
- private val consumers = new mutable.HashMap[String, ConsumerMetadata]
- private var state: GroupState = Stable
- var generationId = 0
-
- def is(groupState: GroupState) = state == groupState
- def has(consumerId: String) = consumers.contains(consumerId)
- def get(consumerId: String) = consumers(consumerId)
-
- def add(consumerId: String, consumer: ConsumerMetadata) {
- consumers.put(consumerId, consumer)
- }
-
- def remove(consumerId: String) {
- consumers.remove(consumerId)
- }
-
- def isEmpty = consumers.isEmpty
-
- def topicsPerConsumer = consumers.mapValues(_.topics).toMap
-
- def topics = consumers.values.flatMap(_.topics).toSet
-
- def notYetRejoinedConsumers = consumers.values.filter(_.awaitingRebalanceCallback == null).toList
-
- def allConsumers = consumers.values.toList
-
- def rebalanceTimeout = consumers.values.foldLeft(0) {(timeout, consumer) =>
- timeout.max(consumer.sessionTimeoutMs)
- }
-
- // TODO: decide if ids should be predictable or random
- def generateNextConsumerId = UUID.randomUUID().toString
-
- def canRebalance = state == Stable
-
- def transitionTo(groupState: GroupState) {
- assertValidTransition(groupState)
- state = groupState
- }
-
- private def assertValidTransition(targetState: GroupState) {
- if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state))
- throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
- .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
deleted file mode 100644
index 64ed4a5..0000000
--- a/core/src/main/scala/kafka/coordinator/ConsumerMetadata.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.utils.nonthreadsafe
-
-/**
- * Consumer metadata contains the following metadata:
- *
- * Heartbeat metadata:
- * 1. negotiated heartbeat session timeout
- * 2. timestamp of the latest heartbeat
- *
- * Subscription metadata:
- * 1. subscribed topics
- * 2. assigned partitions for the subscribed topics
- *
- * In addition, it also contains the following state information:
- *
- * 1. Awaiting rebalance callback: when the consumer group is in the prepare-rebalance state,
- * its rebalance callback will be kept in the metadata if the
- * consumer has sent the join group request
- */
-@nonthreadsafe
-private[coordinator] class ConsumerMetadata(val consumerId: String,
- val groupId: String,
- var topics: Set[String],
- val sessionTimeoutMs: Int) {
-
- var awaitingRebalanceCallback: (Set[TopicAndPartition], String, Int, Short) => Unit = null
- var assignedTopicPartitions = Set.empty[TopicAndPartition]
- var latestHeartbeat: Long = -1
- var isLeaving: Boolean = false
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index a33231a..2279924 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -17,11 +17,8 @@
package kafka.coordinator
-import kafka.server.KafkaConfig
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{threadsafe, ZkUtils, Logging}
-import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.{ZkClient, IZkDataListener}
+import kafka.utils.threadsafe
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -32,9 +29,7 @@ import scala.collection.mutable
* It delegates all group logic to the callers.
*/
@threadsafe
-private[coordinator] class CoordinatorMetadata(brokerId: Int,
- zkUtils: ZkUtils,
- maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
+private[coordinator] class CoordinatorMetadata(brokerId: Int) {
/**
* NOTE: If a group lock and metadataLock are simultaneously needed,
@@ -45,24 +40,11 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
/**
* These should be guarded by metadataLock
*/
- private val groups = new mutable.HashMap[String, ConsumerGroupMetadata]
- private val groupsPerTopic = new mutable.HashMap[String, Set[String]]
- private val topicPartitionCounts = new mutable.HashMap[String, Int]
- private val topicPartitionChangeListeners = new mutable.HashMap[String, TopicPartitionChangeListener]
+ private val groups = new mutable.HashMap[String, GroupMetadata]
def shutdown() {
inWriteLock(metadataLock) {
- topicPartitionChangeListeners.keys.foreach(deregisterTopicPartitionChangeListener)
- topicPartitionChangeListeners.clear()
groups.clear()
- groupsPerTopic.clear()
- topicPartitionCounts.clear()
- }
- }
-
- def partitionsPerTopic = {
- inReadLock(metadataLock) {
- topicPartitionCounts.toMap
}
}
@@ -78,148 +60,22 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int,
/**
* Add a group or get the group associated with the given groupId if it already exists
*/
- def addGroup(groupId: String, partitionAssignmentStrategy: String) = {
+ def addGroup(groupId: String, protocolType: String) = {
inWriteLock(metadataLock) {
- groups.getOrElseUpdate(groupId, new ConsumerGroupMetadata(groupId, partitionAssignmentStrategy))
+ groups.getOrElseUpdate(groupId, new GroupMetadata(groupId, protocolType))
}
}
/**
* Remove all metadata associated with the group, including its topics
* @param groupId the groupId of the group we are removing
- * @param topicsForGroup topics that consumers in the group were subscribed to
*/
- def removeGroup(groupId: String, topicsForGroup: Set[String]) {
+ def removeGroup(groupId: String) {
inWriteLock(metadataLock) {
- topicsForGroup.foreach(topic => unbindGroupFromTopics(groupId, topicsForGroup))
+ if (!groups.contains(groupId))
+ throw new IllegalArgumentException("Cannot remove non-existing group")
groups.remove(groupId)
}
}
- /**
- * Add the given group to the set of groups interested in
- * topic partition changes for the given topics
- */
- def bindGroupToTopics(groupId: String, topics: Set[String]) {
- inWriteLock(metadataLock) {
- require(groups.contains(groupId), "CoordinatorMetadata can only bind existing groups")
- topics.foreach(topic => bindGroupToTopic(groupId, topic))
- }
- }
-
- /**
- * Remove the given group from the set of groups interested in
- * topic partition changes for the given topics
- */
- def unbindGroupFromTopics(groupId: String, topics: Set[String]) {
- inWriteLock(metadataLock) {
- require(groups.contains(groupId), "CoordinatorMetadata can only unbind existing groups")
- topics.foreach(topic => unbindGroupFromTopic(groupId, topic))
- }
- }
-
- /**
- * Add the given group to the set of groups interested in the topicsToBind and
- * remove the given group from the set of groups interested in the topicsToUnbind
- */
- def bindAndUnbindGroupFromTopics(groupId: String, topicsToBind: Set[String], topicsToUnbind: Set[String]) {
- inWriteLock(metadataLock) {
- require(groups.contains(groupId), "CoordinatorMetadata can only update topic bindings for existing groups")
- topicsToBind.foreach(topic => bindGroupToTopic(groupId, topic))
- topicsToUnbind.foreach(topic => unbindGroupFromTopic(groupId, topic))
- }
- }
-
- private def isListeningToTopic(topic: String) = topicPartitionChangeListeners.contains(topic)
-
- private def bindGroupToTopic(groupId: String, topic: String) {
- if (isListeningToTopic(topic)) {
- val currentGroupsForTopic = groupsPerTopic(topic)
- groupsPerTopic.put(topic, currentGroupsForTopic + groupId)
- }
- else {
- groupsPerTopic.put(topic, Set(groupId))
- topicPartitionCounts.put(topic, getTopicPartitionCountFromZK(topic))
- registerTopicPartitionChangeListener(topic)
- }
- }
-
- private def unbindGroupFromTopic(groupId: String, topic: String) {
- if (isListeningToTopic(topic)) {
- val remainingGroupsForTopic = groupsPerTopic(topic) - groupId
- if (remainingGroupsForTopic.isEmpty) {
- // no other group cares about the topic, so erase all metadata associated with the topic
- groupsPerTopic.remove(topic)
- topicPartitionCounts.remove(topic)
- deregisterTopicPartitionChangeListener(topic)
- } else {
- groupsPerTopic.put(topic, remainingGroupsForTopic)
- }
- }
- }
-
- private def getTopicPartitionCountFromZK(topic: String) = {
- val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
- topicData(topic).size
- }
-
- private def registerTopicPartitionChangeListener(topic: String) {
- val listener = new TopicPartitionChangeListener
- topicPartitionChangeListeners.put(topic, listener)
- zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener)
- }
-
- private def deregisterTopicPartitionChangeListener(topic: String) {
- val listener = topicPartitionChangeListeners(topic)
- zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener)
- topicPartitionChangeListeners.remove(topic)
- }
-
- /**
- * Zookeeper listener to handle topic partition changes
- */
- class TopicPartitionChangeListener extends IZkDataListener with Logging {
- this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: "
-
- override def handleDataChange(dataPath: String, data: Object) {
- info("Handling data change for path: %s data: %s".format(dataPath, data))
- val topic = topicFromDataPath(dataPath)
- val numPartitions = getTopicPartitionCountFromZK(topic)
-
- val groupsToRebalance = inWriteLock(metadataLock) {
- /*
- * This condition exists because a consumer can leave and modify CoordinatorMetadata state
- * while ZkClient begins handling the data change but before we acquire the metadataLock.
- */
- if (isListeningToTopic(topic)) {
- topicPartitionCounts.put(topic, numPartitions)
- groupsPerTopic(topic).map(groupId => groups(groupId))
- }
- else Set.empty[ConsumerGroupMetadata]
- }
- groupsToRebalance.foreach(maybePrepareRebalance)
- }
-
- override def handleDataDeleted(dataPath: String) {
- info("Handling data delete for path: %s".format(dataPath))
- val topic = topicFromDataPath(dataPath)
- val groupsToRebalance = inWriteLock(metadataLock) {
- /*
- * This condition exists because a consumer can leave and modify CoordinatorMetadata state
- * while ZkClient begins handling the data delete but before we acquire the metadataLock.
- */
- if (isListeningToTopic(topic)) {
- topicPartitionCounts.put(topic, 0)
- groupsPerTopic(topic).map(groupId => groups(groupId))
- }
- else Set.empty[ConsumerGroupMetadata]
- }
- groupsToRebalance.foreach(maybePrepareRebalance)
- }
-
- private def topicFromDataPath(dataPath: String) = {
- val nodes = dataPath.split("/")
- nodes.last
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 70a710c..8e250c3 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -23,13 +23,13 @@ import kafka.server.DelayedOperation
* Delayed heartbeat operations that are added to the purgatory for session timeout checking.
* Heartbeats are paused during rebalance.
*/
-private[coordinator] class DelayedHeartbeat(consumerCoordinator: ConsumerCoordinator,
- group: ConsumerGroupMetadata,
- consumer: ConsumerMetadata,
+private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
+ group: GroupMetadata,
+ member: MemberMetadata,
heartbeatDeadline: Long,
sessionTimeout: Long)
extends DelayedOperation(sessionTimeout) {
- override def tryComplete(): Boolean = consumerCoordinator.tryCompleteHeartbeat(group, consumer, heartbeatDeadline, forceComplete)
- override def onExpiration() = consumerCoordinator.onExpirationHeartbeat(group, consumer, heartbeatDeadline)
- override def onComplete() = consumerCoordinator.onCompleteHeartbeat()
+ override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
+ override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
+ override def onComplete() = coordinator.onCompleteHeartbeat()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
new file mode 100644
index 0000000..ae96e15
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
+ *
+ * Whenever a join-group request is received, check if all known group members have requested
+ * to re-join the group; if yes, complete this operation to proceed rebalance.
+ *
+ * When the operation has expired, any known members that have not requested to re-join
+ * the group are marked as failed, and complete this operation to proceed rebalance with
+ * the rest of the group.
+ */
+private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
+ group: GroupMetadata,
+ sessionTimeout: Long)
+ extends DelayedOperation(sessionTimeout) {
+
+ override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
+ override def onExpiration() = coordinator.onExpireJoin()
+ override def onComplete() = coordinator.onCompleteJoin(group)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
deleted file mode 100644
index 8247d33..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
- *
- * Whenever a join-group request is received, check if all known consumers have requested
- * to re-join the group; if yes, complete this operation to proceed rebalance.
- *
- * When the operation has expired, any known consumers that have not requested to re-join
- * the group are marked as failed, and complete this operation to proceed rebalance with
- * the rest of the group.
- */
-private[coordinator] class DelayedRebalance(consumerCoordinator: ConsumerCoordinator,
- group: ConsumerGroupMetadata,
- sessionTimeout: Long)
- extends DelayedOperation(sessionTimeout) {
-
- override def tryComplete(): Boolean = consumerCoordinator.tryCompleteRebalance(group, forceComplete)
- override def onExpiration() = consumerCoordinator.onExpirationRebalance()
- override def onComplete() = consumerCoordinator.onCompleteRebalance(group)
-}