You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:11 UTC
[08/10] kafka git commit: KAFKA-5059: Implement Transactional
Coordinator
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
new file mode 100644
index 0000000..e814717
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.OffsetAndMetadata
+import kafka.log.LogConfig
+import kafka.message.ProducerCompressionCodec
+import kafka.server._
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Seq, immutable}
+
+
+/**
+ * GroupCoordinator handles general group membership and offset management.
+ *
+ * Each Kafka server instantiates a coordinator which is responsible for a set of
+ * groups. Groups are assigned to coordinators based on their group names.
+ */
+class GroupCoordinator(val brokerId: Int,
+ val groupConfig: GroupConfig,
+ val offsetConfig: OffsetConfig,
+ val groupManager: GroupMetadataManager,
+ val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+ time: Time) extends Logging {
+ type JoinCallback = JoinGroupResult => Unit
+ type SyncCallback = (Array[Byte], Errors) => Unit
+
+ this.logIdent = "[GroupCoordinator " + brokerId + "]: "
+
+ private val isActive = new AtomicBoolean(false)
+
+ def offsetsTopicConfigs: Properties = {
+ val props = new Properties
+ props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
+ props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
+ props
+ }
+
+ /**
+ * NOTE: If a group lock and metadataLock are simultaneously needed,
+ * be sure to acquire the group lock before metadataLock to prevent deadlock
+ */
+
+ /**
+ * Startup logic executed at the same time when the server starts up.
+ */
+ def startup(enableMetadataExpiration: Boolean = true) {
+ info("Starting up.")
+ if (enableMetadataExpiration)
+ groupManager.enableMetadataExpiration()
+ isActive.set(true)
+ info("Startup complete.")
+ }
+
+ /**
+ * Shutdown logic executed at the same time when server shuts down.
+ * Ordering of actions should be reversed from the startup process.
+ */
+ def shutdown() {
+ info("Shutting down.")
+ isActive.set(false)
+ groupManager.shutdown()
+ heartbeatPurgatory.shutdown()
+ joinPurgatory.shutdown()
+ info("Shutdown complete.")
+ }
+
+ def handleJoinGroup(groupId: String,
+ memberId: String,
+ clientId: String,
+ clientHost: String,
+ rebalanceTimeoutMs: Int,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback) {
+ if (!isActive.get) {
+ responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
+ } else if (!validGroupId(groupId)) {
+ responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
+ } else if (isCoordinatorLoadInProgress(groupId)) {
+ responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
+ } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
+ sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
+ responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
+ } else {
+ // only try to create the group if the group is not unknown AND
+ // the member id is UNKNOWN, if member is specified but group does not
+ // exist we should reject the request
+ groupManager.getGroup(groupId) match {
+ case None =>
+ if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ } else {
+ val group = groupManager.addGroup(new GroupMetadata(groupId))
+ doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ }
+
+ case Some(group) =>
+ doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ }
+ }
+ }
+
+ private def doJoinGroup(group: GroupMetadata,
+ memberId: String,
+ clientId: String,
+ clientHost: String,
+ rebalanceTimeoutMs: Int,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback) {
+ group synchronized {
+ if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
+ // if the new member does not support the group protocol, reject it
+ responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+ } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
+ // if the member trying to register with a un-recognized id, send the response to let
+ // it reset its member id and retry
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+ } else {
+ group.currentState match {
+ case Dead =>
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the member retry
+ // joining without the specified member id,
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+
+ case PreparingRebalance =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ }
+
+ case AwaitingSync =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ if (member.matches(protocols)) {
+ // member is joining with the same metadata (which could be because it failed to
+ // receive the initial JoinGroup response), so just return current group information
+ // for the current generation.
+ responseCallback(JoinGroupResult(
+ members = if (memberId == group.leaderId) {
+ group.currentMemberMetadata
+ } else {
+ Map.empty
+ },
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocol,
+ leaderId = group.leaderId,
+ error = Errors.NONE))
+ } else {
+ // member has changed metadata, so force a rebalance
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ }
+ }
+
+ case Empty | Stable =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ // if the member id is unknown, register the member to the group
+ addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ if (memberId == group.leaderId || !member.matches(protocols)) {
+ // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+ // The latter allows the leader to trigger rebalances for changes affecting assignment
+ // which do not affect the member metadata (such as topic metadata changes for the consumer)
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ } else {
+ // for followers with no actual change to their metadata, just return group information
+ // for the current generation which will allow them to issue SyncGroup
+ responseCallback(JoinGroupResult(
+ members = Map.empty,
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocol,
+ leaderId = group.leaderId,
+ error = Errors.NONE))
+ }
+ }
+ }
+
+ if (group.is(PreparingRebalance))
+ joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ }
+ }
+ }
+
+ def handleSyncGroup(groupId: String,
+ generation: Int,
+ memberId: String,
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: SyncCallback) {
+ if (!isActive.get) {
+ responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Array.empty, Errors.NOT_COORDINATOR)
+ } else {
+ groupManager.getGroup(groupId) match {
+ case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+ case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+ }
+ }
+ }
+
+ private def doSyncGroup(group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: SyncCallback) {
+ var delayedGroupStore: Option[DelayedStore] = None
+
+ group synchronized {
+ if (!group.has(memberId)) {
+ responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+ } else if (generationId != group.generationId) {
+ responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
+ } else {
+ group.currentState match {
+ case Empty | Dead =>
+ responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+
+ case PreparingRebalance =>
+ responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
+
+ case AwaitingSync =>
+ group.get(memberId).awaitingSyncCallback = responseCallback
+
+ // if this is the leader, then we can attempt to persist state and transition to stable
+ if (memberId == group.leaderId) {
+ info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
+
+ // fill any missing members with an empty assignment
+ val missing = group.allMembers -- groupAssignment.keySet
+ val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+ delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
+ group synchronized {
+ // another member may have joined the group while we were awaiting this callback,
+ // so we must ensure we are still in the AwaitingSync state and the same generation
+ // when it gets invoked. if we have transitioned to another state, then do nothing
+ if (group.is(AwaitingSync) && generationId == group.generationId) {
+ if (error != Errors.NONE) {
+ resetAndPropagateAssignmentError(group, error)
+ maybePrepareRebalance(group)
+ } else {
+ setAndPropagateAssignment(group, assignment)
+ group.transitionTo(Stable)
+ }
+ }
+ }
+ })
+ }
+
+ case Stable =>
+ // if the group is stable, we just return the current assignment
+ val memberMetadata = group.get(memberId)
+ responseCallback(memberMetadata.assignment, Errors.NONE)
+ completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+ }
+ }
+ }
+
+ // store the group metadata without holding the group lock to avoid the potential
+ // for deadlock if the callback is invoked holding other locks (e.g. the replica
+ // state change lock)
+ delayedGroupStore.foreach(groupManager.store)
+ }
+
+ def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
+ if (!isActive.get) {
+ responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Errors.NOT_COORDINATOR)
+ } else if (isCoordinatorLoadInProgress(groupId)) {
+ responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+ } else {
+ groupManager.getGroup(groupId) match {
+ case None =>
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+ // joining without specified consumer id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+ case Some(group) =>
+ group synchronized {
+ if (group.is(Dead) || !group.has(memberId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ } else {
+ val member = group.get(memberId)
+ removeHeartbeatForLeavingMember(group, member)
+ onMemberFailure(group, member)
+ responseCallback(Errors.NONE)
+ }
+ }
+ }
+ }
+ }
+
+ def handleHeartbeat(groupId: String,
+ memberId: String,
+ generationId: Int,
+ responseCallback: Errors => Unit) {
+ if (!isActive.get) {
+ responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Errors.NOT_COORDINATOR)
+ } else if (isCoordinatorLoadInProgress(groupId)) {
+ // the group is still loading, so respond just blindly
+ responseCallback(Errors.NONE)
+ } else {
+ groupManager.getGroup(groupId) match {
+ case None =>
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+ case Some(group) =>
+ group synchronized {
+ group.currentState match {
+ case Dead =>
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the member retry
+ // joining without the specified member id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+ case Empty =>
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+ case AwaitingSync =>
+ if (!group.has(memberId))
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ else
+ responseCallback(Errors.REBALANCE_IN_PROGRESS)
+
+ case PreparingRebalance =>
+ if (!group.has(memberId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ } else if (generationId != group.generationId) {
+ responseCallback(Errors.ILLEGAL_GENERATION)
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ responseCallback(Errors.REBALANCE_IN_PROGRESS)
+ }
+
+ case Stable =>
+ if (!group.has(memberId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID)
+ } else if (generationId != group.generationId) {
+ responseCallback(Errors.ILLEGAL_GENERATION)
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ responseCallback(Errors.NONE)
+ }
+ }
+ }
+ }
+ }
+ }
+
+ def handleCommitOffsets(groupId: String,
+ memberId: String,
+ generationId: Int,
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+ if (!isActive.get) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
+ } else if (isCoordinatorLoadInProgress(groupId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
+ } else {
+ groupManager.getGroup(groupId) match {
+ case None =>
+ if (generationId < 0) {
+ // the group is not relying on Kafka for group management, so allow the commit
+ val group = groupManager.addGroup(new GroupMetadata(groupId))
+ doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+ } else {
+ // or this is a request coming from an older generation. either way, reject the commit
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+ }
+
+ case Some(group) =>
+ doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+ }
+ }
+ }
+
+ private def doCommitOffsets(group: GroupMetadata,
+ memberId: String,
+ generationId: Int,
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+ var delayedOffsetStore: Option[DelayedStore] = None
+
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+ } else if (generationId < 0 && group.is(Empty)) {
+ // the group is only using Kafka to store offsets
+ delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback)
+ } else if (group.is(AwaitingSync)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+ } else if (!group.has(memberId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+ } else if (generationId != group.generationId) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback)
+ }
+ }
+
+ // store the offsets without holding the group lock
+ delayedOffsetStore.foreach(groupManager.store)
+ }
+
+ def handleFetchOffsets(groupId: String,
+ partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
+ if (!isActive.get)
+ (Errors.COORDINATOR_NOT_AVAILABLE, Map())
+ else if (!isCoordinatorForGroup(groupId)) {
+ debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
+ (Errors.NOT_COORDINATOR, Map())
+ } else if (isCoordinatorLoadInProgress(groupId))
+ (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
+ else {
+ // return offsets blindly regardless the current group state since the group may be using
+ // Kafka commit storage without automatic group management
+ (Errors.NONE, groupManager.getOffsets(groupId, partitions))
+ }
+ }
+
+ def handleListGroups(): (Errors, List[GroupOverview]) = {
+ if (!isActive.get) {
+ (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+ } else {
+ val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
+ (errorCode, groupManager.currentGroups.map(_.overview).toList)
+ }
+ }
+
+ def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+ if (!isActive.get) {
+ (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
+ } else if (isCoordinatorLoadInProgress(groupId)) {
+ (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+ } else {
+ groupManager.getGroup(groupId) match {
+ case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+ case Some(group) =>
+ group synchronized {
+ (Errors.NONE, group.summary)
+ }
+ }
+ }
+ }
+
+ def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
+ groupManager.cleanupGroupMetadata(Some(topicPartitions))
+ }
+
+ private def onGroupUnloaded(group: GroupMetadata) {
+ group synchronized {
+ info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
+ val previousState = group.currentState
+ group.transitionTo(Dead)
+
+ previousState match {
+ case Empty | Dead =>
+ case PreparingRebalance =>
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingJoinCallback != null) {
+ member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
+ member.awaitingJoinCallback = null
+ }
+ }
+ joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+
+ case Stable | AwaitingSync =>
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
+ member.awaitingSyncCallback = null
+ }
+ heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
+ }
+ }
+ }
+ }
+
+ private def onGroupLoaded(group: GroupMetadata) {
+ group synchronized {
+ info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
+ assert(group.is(Stable) || group.is(Empty))
+ group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
+ }
+ }
+
+ def handleGroupImmigration(offsetTopicPartitionId: Int) {
+ groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
+ }
+
+ def handleGroupEmigration(offsetTopicPartitionId: Int) {
+ groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+ }
+
+ private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+ propagateAssignment(group, Errors.NONE)
+ }
+
+ private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
+ assert(group.is(AwaitingSync))
+ group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+ propagateAssignment(group, error)
+ }
+
+ private def propagateAssignment(group: GroupMetadata, error: Errors) {
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(member.assignment, error)
+ member.awaitingSyncCallback = null
+
+ // reset the session timeout for members after propagating the member's assignment.
+ // This is because if any member's session expired while we were still awaiting either
+ // the leader sync group or the storage callback, its expiration will be ignored and no
+ // future heartbeat expectations will not be scheduled.
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ }
+ }
+ }
+
+ private def validGroupId(groupId: String): Boolean = {
+ groupId != null && !groupId.isEmpty
+ }
+
+ private def joinError(memberId: String, error: Errors): JoinGroupResult = {
+ JoinGroupResult(
+ members = Map.empty,
+ memberId = memberId,
+ generationId = 0,
+ subProtocol = GroupCoordinator.NoProtocol,
+ leaderId = GroupCoordinator.NoLeader,
+ error = error)
+ }
+
+ /**
+ * Complete existing DelayedHeartbeats for the given member and schedule the next one
+ */
+ private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
+ // complete current heartbeat expectation
+ member.latestHeartbeat = time.milliseconds()
+ val memberKey = MemberKey(member.groupId, member.memberId)
+ heartbeatPurgatory.checkAndComplete(memberKey)
+
+ // reschedule the next heartbeat expiration deadline
+ val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
+ val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
+ heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
+ }
+
+ private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
+ member.isLeaving = true
+ val memberKey = MemberKey(member.groupId, member.memberId)
+ heartbeatPurgatory.checkAndComplete(memberKey)
+ }
+
+ private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
+ sessionTimeoutMs: Int,
+ clientId: String,
+ clientHost: String,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ group: GroupMetadata,
+ callback: JoinCallback) = {
+ // use the client-id with a random id suffix as the member-id
+ val memberId = clientId + "-" + group.generateMemberIdSuffix
+ val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
+ sessionTimeoutMs, protocolType, protocols)
+ member.awaitingJoinCallback = callback
+ group.add(member)
+ maybePrepareRebalance(group)
+ member
+ }
+
+ private def updateMemberAndRebalance(group: GroupMetadata,
+ member: MemberMetadata,
+ protocols: List[(String, Array[Byte])],
+ callback: JoinCallback) {
+ member.supportedProtocols = protocols
+ member.awaitingJoinCallback = callback
+ maybePrepareRebalance(group)
+ }
+
+ private def maybePrepareRebalance(group: GroupMetadata) {
+ group synchronized {
+ if (group.canRebalance)
+ prepareRebalance(group)
+ }
+ }
+
+ private def prepareRebalance(group: GroupMetadata) {
+ // if any members are awaiting sync, cancel their request and have them rejoin
+ if (group.is(AwaitingSync))
+ resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
+
+ group.transitionTo(PreparingRebalance)
+ info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+ val rebalanceTimeout = group.rebalanceTimeoutMs
+ val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
+ val groupKey = GroupKey(group.groupId)
+ joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
+ }
+
+ private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
+ trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+ group.remove(member.memberId)
+ group.currentState match {
+ case Dead | Empty =>
+ case Stable | AwaitingSync => maybePrepareRebalance(group)
+ case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ }
+ }
+
+ def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
+ group synchronized {
+ if (group.notYetRejoinedMembers.isEmpty)
+ forceComplete()
+ else false
+ }
+ }
+
+ def onExpireJoin() {
+ // TODO: add metrics for restabilize timeouts
+ }
+
+ def onCompleteJoin(group: GroupMetadata) {
+ var delayedStore: Option[DelayedStore] = None
+ group synchronized {
+ // remove any members who haven't joined the group yet
+ group.notYetRejoinedMembers.foreach { failedMember =>
+ group.remove(failedMember.memberId)
+ // TODO: cut the socket connection to the client
+ }
+
+ if (!group.is(Dead)) {
+ group.initNextGeneration()
+ if (group.is(Empty)) {
+ info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+
+ delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
+ if (error != Errors.NONE) {
+ // we failed to write the empty group metadata. If the broker fails before another rebalance,
+ // the previous generation written to the log will become active again (and most likely timeout).
+ // This should be safe since there are no active members in an empty generation, so we just warn.
+ warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
+ }
+ })
+ } else {
+ info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+
+ // trigger the awaiting join group response callback for all the members after rebalancing
+ for (member <- group.allMemberMetadata) {
+ assert(member.awaitingJoinCallback != null)
+ val joinResult = JoinGroupResult(
+ members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+ memberId = member.memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocol,
+ leaderId = group.leaderId,
+ error = Errors.NONE)
+
+ member.awaitingJoinCallback(joinResult)
+ member.awaitingJoinCallback = null
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ }
+ }
+ }
+ }
+
+ // call without holding the group lock
+ delayedStore.foreach(groupManager.store)
+ }
+
+ def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+ group synchronized {
+ if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+ forceComplete()
+ else false
+ }
+ }
+
+ def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
+ group synchronized {
+ if (!shouldKeepMemberAlive(member, heartbeatDeadline))
+ onMemberFailure(group, member)
+ }
+ }
+
+ def onCompleteHeartbeat() {
+ // TODO: add metrics for complete heartbeats
+ }
+
+ def partitionFor(group: String): Int = groupManager.partitionFor(group)
+
+ private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
+ member.awaitingJoinCallback != null ||
+ member.awaitingSyncCallback != null ||
+ member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
+
+ private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
+
+ private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
+}
+
+object GroupCoordinator {
+
+ val NoState = ""
+ val NoProtocolType = ""
+ val NoProtocol = ""
+ val NoLeader = ""
+ val NoMembers = List[MemberSummary]()
+ val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+ val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+
+ def apply(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ replicaManager: ReplicaManager,
+ time: Time): GroupCoordinator = {
+ val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
+ val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
+ apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+ }
+
+ private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
+ maxMetadataSize = config.offsetMetadataMaxSize,
+ loadBufferSize = config.offsetsLoadBufferSize,
+ offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
+ offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+ offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
+ offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+ offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
+ offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+ offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+ )
+
+ def apply(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ replicaManager: ReplicaManager,
+ heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+ time: Time): GroupCoordinator = {
+ val offsetConfig = this.offsetConfig(config)
+ val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+ groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+ val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
+ offsetConfig, replicaManager, zkUtils, time)
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
+ }
+
+}
+
+case class GroupConfig(groupMinSessionTimeoutMs: Int,
+ groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+ memberId: String,
+ generationId: Int,
+ subProtocol: String,
+ leaderId: String,
+ error: Errors)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
new file mode 100644
index 0000000..b433284
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.UUID
+
+import kafka.common.OffsetAndMetadata
+import kafka.utils.nonthreadsafe
+import org.apache.kafka.common.TopicPartition
+
+import scala.collection.{Seq, immutable, mutable}
+
+private[group] sealed trait GroupState { def state: Byte }
+
+/**
+ * Group is preparing to rebalance
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ * respond to sync group with REBALANCE_IN_PROGRESS
+ * remove member on leave group request
+ * park join group requests from new or existing members until all expected members have joined
+ * allow offset commits from previous generation
+ * allow offset fetch requests
+ * transition: some members have joined by the timeout => AwaitingSync
+ * all members have left the group => Empty
+ * group is removed by partition emigration => Dead
+ */
+private[group] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
+
+/**
+ * Group is awaiting state assignment from the leader
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ * respond to offset commits with REBALANCE_IN_PROGRESS
+ * park sync group requests from followers until transition to Stable
+ * allow offset fetch requests
+ * transition: sync group with state assignment received from leader => Stable
+ * join group from new member or existing member with updated metadata => PreparingRebalance
+ * leave group from existing member => PreparingRebalance
+ * member failure detected => PreparingRebalance
+ * group is removed by partition emigration => Dead
+ */
+private[group] case object AwaitingSync extends GroupState { val state: Byte = 5}
+
+/**
+ * Group is stable
+ *
+ * action: respond to member heartbeats normally
+ * respond to sync group from any member with current assignment
+ * respond to join group from followers with matching metadata with current group metadata
+ * allow offset commits from member of current generation
+ * allow offset fetch requests
+ * transition: member failure detected via heartbeat => PreparingRebalance
+ * leave group from existing member => PreparingRebalance
+ * leader join-group received => PreparingRebalance
+ * follower join-group with new metadata => PreparingRebalance
+ * group is removed by partition emigration => Dead
+ */
+private[group] case object Stable extends GroupState { val state: Byte = 3 }
+
+/**
+ * Group has no more members and its metadata is being removed
+ *
+ * action: respond to join group with UNKNOWN_MEMBER_ID
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
+ */
+private[group] case object Dead extends GroupState { val state: Byte = 4 }
+
+/**
+ * Group has no more members, but lingers until all offsets have expired. This state
+ * also represents groups which use Kafka only for offset commits and have no members.
+ *
+ * action: respond normally to join group from new members
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: last offsets removed in periodic expiration task => Dead
+ * join group from a new member => PreparingRebalance
+ * group is removed by partition emigration => Dead
+ * group is removed by expiration => Dead
+ */
+private[group] case object Empty extends GroupState { val state: Byte = 5 }
+
+
+private object GroupMetadata {
+ private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+ Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
+ AwaitingSync -> Set(PreparingRebalance),
+ Stable -> Set(AwaitingSync),
+ PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
+ Empty -> Set(PreparingRebalance))
+}
+
+/**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+ protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+ protocolType: String,
+ protocol: String,
+ members: List[MemberSummary])
+
+/**
+ * Group contains the following metadata:
+ *
+ * Membership metadata:
+ * 1. Members registered in this group
+ * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
+ * 3. Protocol metadata associated with group members
+ *
+ * State metadata:
+ * 1. group state
+ * 2. generation id
+ * 3. leader id
+ */
+@nonthreadsafe
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
+
+ private var state: GroupState = initialState
+ private val members = new mutable.HashMap[String, MemberMetadata]
+ private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+ private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+
+ var protocolType: Option[String] = None
+ var generationId = 0
+ var leaderId: String = null
+ var protocol: String = null
+
+ def is(groupState: GroupState) = state == groupState
+ def not(groupState: GroupState) = state != groupState
+ def has(memberId: String) = members.contains(memberId)
+ def get(memberId: String) = members(memberId)
+
+ def add(member: MemberMetadata) {
+ if (members.isEmpty)
+ this.protocolType = Some(member.protocolType)
+
+ assert(groupId == member.groupId)
+ assert(this.protocolType.orNull == member.protocolType)
+ assert(supportsProtocols(member.protocols))
+
+ if (leaderId == null)
+ leaderId = member.memberId
+ members.put(member.memberId, member)
+ }
+
+ def remove(memberId: String) {
+ members.remove(memberId)
+ if (memberId == leaderId) {
+ leaderId = if (members.isEmpty) {
+ null
+ } else {
+ members.keys.head
+ }
+ }
+ }
+
+ def currentState = state
+
+ def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
+
+ def allMembers = members.keySet
+
+ def allMemberMetadata = members.values.toList
+
+ def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
+ timeout.max(member.rebalanceTimeoutMs)
+ }
+
+ // TODO: decide if ids should be predictable or random
+ def generateMemberIdSuffix = UUID.randomUUID().toString
+
+ def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
+
+ def transitionTo(groupState: GroupState) {
+ assertValidTransition(groupState)
+ state = groupState
+ }
+
+ def selectProtocol: String = {
+ if (members.isEmpty)
+ throw new IllegalStateException("Cannot select protocol for empty group")
+
+ // select the protocol for this group which is supported by all members
+ val candidates = candidateProtocols
+
+ // let each member vote for one of the protocols and choose the one with the most votes
+ val votes: List[(String, Int)] = allMemberMetadata
+ .map(_.vote(candidates))
+ .groupBy(identity)
+ .mapValues(_.size)
+ .toList
+
+ votes.maxBy(_._2)._1
+ }
+
+ private def candidateProtocols = {
+ // get the set of protocols that are commonly supported by all members
+ allMemberMetadata
+ .map(_.protocols)
+ .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+ }
+
+ def supportsProtocols(memberProtocols: Set[String]) = {
+ members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+ }
+
+ def initNextGeneration() = {
+ assert(notYetRejoinedMembers == List.empty[MemberMetadata])
+ if (members.nonEmpty) {
+ generationId += 1
+ protocol = selectProtocol
+ transitionTo(AwaitingSync)
+ } else {
+ generationId += 1
+ protocol = null
+ transitionTo(Empty)
+ }
+ }
+
+ def currentMemberMetadata: Map[String, Array[Byte]] = {
+ if (is(Dead) || is(PreparingRebalance))
+ throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
+ members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+ }
+
+ def summary: GroupSummary = {
+ if (is(Stable)) {
+ val members = this.members.values.map { member => member.summary(protocol) }.toList
+ GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
+ } else {
+ val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+ GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
+ }
+ }
+
+ def overview: GroupOverview = {
+ GroupOverview(groupId, protocolType.getOrElse(""))
+ }
+
+ def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata]) {
+ this.offsets ++= offsets
+ }
+
+ def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
+ if (pendingOffsetCommits.contains(topicPartition))
+ offsets.put(topicPartition, offset)
+
+ pendingOffsetCommits.get(topicPartition) match {
+ case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
+ case _ =>
+ }
+ }
+
+ def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
+ pendingOffsetCommits.get(topicPartition) match {
+ case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
+ case _ =>
+ }
+ }
+
+ def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
+ pendingOffsetCommits ++= offsets
+ }
+
+ def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
+ topicPartitions.flatMap { topicPartition =>
+ pendingOffsetCommits.remove(topicPartition)
+ val removedOffset = offsets.remove(topicPartition)
+ removedOffset.map(topicPartition -> _)
+ }.toMap
+ }
+
+ def removeExpiredOffsets(startMs: Long) = {
+ val expiredOffsets = offsets.filter {
+ case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
+ }
+ offsets --= expiredOffsets.keySet
+ expiredOffsets.toMap
+ }
+
+ def allOffsets = offsets.toMap
+
+ def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
+
+ def numOffsets = offsets.size
+
+ def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
+
+ private def assertValidTransition(targetState: GroupState) {
+ if (!GroupMetadata.validPreviousStates(targetState).contains(state))
+ throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
+ .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
+ }
+
+ override def toString: String = {
+ "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
+ }
+}
+