You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:11 UTC

[08/10] kafka git commit: KAFKA-5059: Implement Transactional Coordinator

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
new file mode 100644
index 0000000..e814717
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.OffsetAndMetadata
+import kafka.log.LogConfig
+import kafka.message.ProducerCompressionCodec
+import kafka.server._
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.{Map, Seq, immutable}
+
+
+/**
+ * GroupCoordinator handles general group membership and offset management.
+ *
+ * Each Kafka server instantiates a coordinator which is responsible for a set of
+ * groups. Groups are assigned to coordinators based on their group names.
+ */
+class GroupCoordinator(val brokerId: Int,
+                       val groupConfig: GroupConfig,
+                       val offsetConfig: OffsetConfig,
+                       val groupManager: GroupMetadataManager,
+                       val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+                       val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+                       time: Time) extends Logging {
+  type JoinCallback = JoinGroupResult => Unit
+  type SyncCallback = (Array[Byte], Errors) => Unit
+
+  this.logIdent = "[GroupCoordinator " + brokerId + "]: "
+
+  private val isActive = new AtomicBoolean(false)
+
+  def offsetsTopicConfigs: Properties = {
+    val props = new Properties
+    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
+    props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
+    props
+  }
+
+  /**
+   * NOTE: If a group lock and metadataLock are simultaneously needed,
+   * be sure to acquire the group lock before metadataLock to prevent deadlock
+   */
+
+  /**
+   * Startup logic executed at the same time when the server starts up.
+   */
+  def startup(enableMetadataExpiration: Boolean = true) {
+    info("Starting up.")
+    if (enableMetadataExpiration)
+      groupManager.enableMetadataExpiration()
+    isActive.set(true)
+    info("Startup complete.")
+  }
+
+  /**
+   * Shutdown logic executed at the same time when server shuts down.
+   * Ordering of actions should be reversed from the startup process.
+   */
+  def shutdown() {
+    info("Shutting down.")
+    isActive.set(false)
+    groupManager.shutdown()
+    heartbeatPurgatory.shutdown()
+    joinPurgatory.shutdown()
+    info("Shutdown complete.")
+  }
+
+  def handleJoinGroup(groupId: String,
+                      memberId: String,
+                      clientId: String,
+                      clientHost: String,
+                      rebalanceTimeoutMs: Int,
+                      sessionTimeoutMs: Int,
+                      protocolType: String,
+                      protocols: List[(String, Array[Byte])],
+                      responseCallback: JoinCallback) {
+    if (!isActive.get) {
+      responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
+    } else if (!validGroupId(groupId)) {
+      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
+    } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
+               sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
+      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
+    } else {
+      // only try to create the group if the group is not unknown AND
+      // the member id is UNKNOWN, if member is specified but group does not
+      // exist we should reject the request
+      groupManager.getGroup(groupId) match {
+        case None =>
+          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+          } else {
+            val group = groupManager.addGroup(new GroupMetadata(groupId))
+            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          }
+
+        case Some(group) =>
+          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
+      }
+    }
+  }
+
+  private def doJoinGroup(group: GroupMetadata,
+                          memberId: String,
+                          clientId: String,
+                          clientHost: String,
+                          rebalanceTimeoutMs: Int,
+                          sessionTimeoutMs: Int,
+                          protocolType: String,
+                          protocols: List[(String, Array[Byte])],
+                          responseCallback: JoinCallback) {
+    group synchronized {
+      if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
+        // if the new member does not support the group protocol, reject it
+        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
+      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
+        // if the member trying to register with a un-recognized id, send the response to let
+        // it reset its member id and retry
+        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+      } else {
+        group.currentState match {
+          case Dead =>
+            // if the group is marked as dead, it means some other thread has just removed the group
+            // from the coordinator metadata; this is likely that the group has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let the member retry
+            // joining without the specified member id,
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
+
+          case PreparingRebalance =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              updateMemberAndRebalance(group, member, protocols, responseCallback)
+            }
+
+          case AwaitingSync =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              if (member.matches(protocols)) {
+                // member is joining with the same metadata (which could be because it failed to
+                // receive the initial JoinGroup response), so just return current group information
+                // for the current generation.
+                responseCallback(JoinGroupResult(
+                  members = if (memberId == group.leaderId) {
+                    group.currentMemberMetadata
+                  } else {
+                    Map.empty
+                  },
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocol,
+                  leaderId = group.leaderId,
+                  error = Errors.NONE))
+              } else {
+                // member has changed metadata, so force a rebalance
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              }
+            }
+
+          case Empty | Stable =>
+            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+              // if the member id is unknown, register the member to the group
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
+            } else {
+              val member = group.get(memberId)
+              if (memberId == group.leaderId || !member.matches(protocols)) {
+                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+                // The latter allows the leader to trigger rebalances for changes affecting assignment
+                // which do not affect the member metadata (such as topic metadata changes for the consumer)
+                updateMemberAndRebalance(group, member, protocols, responseCallback)
+              } else {
+                // for followers with no actual change to their metadata, just return group information
+                // for the current generation which will allow them to issue SyncGroup
+                responseCallback(JoinGroupResult(
+                  members = Map.empty,
+                  memberId = memberId,
+                  generationId = group.generationId,
+                  subProtocol = group.protocol,
+                  leaderId = group.leaderId,
+                  error = Errors.NONE))
+              }
+            }
+        }
+
+        if (group.is(PreparingRebalance))
+          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+      }
+    }
+  }
+
+  def handleSyncGroup(groupId: String,
+                      generation: Int,
+                      memberId: String,
+                      groupAssignment: Map[String, Array[Byte]],
+                      responseCallback: SyncCallback) {
+    if (!isActive.get) {
+      responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Array.empty, Errors.NOT_COORDINATOR)
+    } else {
+      groupManager.getGroup(groupId) match {
+        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+        case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+      }
+    }
+  }
+
+  private def doSyncGroup(group: GroupMetadata,
+                          generationId: Int,
+                          memberId: String,
+                          groupAssignment: Map[String, Array[Byte]],
+                          responseCallback: SyncCallback) {
+    var delayedGroupStore: Option[DelayedStore] = None
+
+    group synchronized {
+      if (!group.has(memberId)) {
+        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+      } else if (generationId != group.generationId) {
+        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
+      } else {
+        group.currentState match {
+          case Empty | Dead =>
+            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
+
+          case PreparingRebalance =>
+            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
+
+          case AwaitingSync =>
+            group.get(memberId).awaitingSyncCallback = responseCallback
+
+            // if this is the leader, then we can attempt to persist state and transition to stable
+            if (memberId == group.leaderId) {
+              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
+
+              // fill any missing members with an empty assignment
+              val missing = group.allMembers -- groupAssignment.keySet
+              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
+
+              delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
+                group synchronized {
+                  // another member may have joined the group while we were awaiting this callback,
+                  // so we must ensure we are still in the AwaitingSync state and the same generation
+                  // when it gets invoked. if we have transitioned to another state, then do nothing
+                  if (group.is(AwaitingSync) && generationId == group.generationId) {
+                    if (error != Errors.NONE) {
+                      resetAndPropagateAssignmentError(group, error)
+                      maybePrepareRebalance(group)
+                    } else {
+                      setAndPropagateAssignment(group, assignment)
+                      group.transitionTo(Stable)
+                    }
+                  }
+                }
+              })
+            }
+
+          case Stable =>
+            // if the group is stable, we just return the current assignment
+            val memberMetadata = group.get(memberId)
+            responseCallback(memberMetadata.assignment, Errors.NONE)
+            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+        }
+      }
+    }
+
+    // store the group metadata without holding the group lock to avoid the potential
+    // for deadlock if the callback is invoked holding other locks (e.g. the replica
+    // state change lock)
+    delayedGroupStore.foreach(groupManager.store)
+  }
+
+  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
+    if (!isActive.get) {
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+    } else {
+      groupManager.getGroup(groupId) match {
+        case None =>
+          // if the group is marked as dead, it means some other thread has just removed the group
+          // from the coordinator metadata; this is likely that the group has migrated to some other
+          // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+          // joining without specified consumer id,
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+        case Some(group) =>
+          group synchronized {
+            if (group.is(Dead) || !group.has(memberId)) {
+              responseCallback(Errors.UNKNOWN_MEMBER_ID)
+            } else {
+              val member = group.get(memberId)
+              removeHeartbeatForLeavingMember(group, member)
+              onMemberFailure(group, member)
+              responseCallback(Errors.NONE)
+            }
+          }
+      }
+    }
+  }
+
+  def handleHeartbeat(groupId: String,
+                      memberId: String,
+                      generationId: Int,
+                      responseCallback: Errors => Unit) {
+    if (!isActive.get) {
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      // the group is still loading, so respond just blindly
+      responseCallback(Errors.NONE)
+    } else {
+      groupManager.getGroup(groupId) match {
+        case None =>
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+        case Some(group) =>
+          group synchronized {
+            group.currentState match {
+              case Dead =>
+                // if the group is marked as dead, it means some other thread has just removed the group
+                // from the coordinator metadata; this is likely that the group has migrated to some other
+                // coordinator OR the group is in a transient unstable phase. Let the member retry
+                // joining without the specified member id,
+                responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+              case Empty =>
+                responseCallback(Errors.UNKNOWN_MEMBER_ID)
+
+              case AwaitingSync =>
+                if (!group.has(memberId))
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
+                else
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
+
+              case PreparingRebalance =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
+                }
+
+              case Stable =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.NONE)
+                }
+            }
+          }
+      }
+    }
+  }
+
+  def handleCommitOffsets(groupId: String,
+                          memberId: String,
+                          generationId: Int,
+                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+    if (!isActive.get) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
+    } else {
+      groupManager.getGroup(groupId) match {
+        case None =>
+          if (generationId < 0) {
+            // the group is not relying on Kafka for group management, so allow the commit
+            val group = groupManager.addGroup(new GroupMetadata(groupId))
+            doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+          } else {
+            // or this is a request coming from an older generation. either way, reject the commit
+            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+          }
+
+        case Some(group) =>
+          doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+      }
+    }
+  }
+
+  private def doCommitOffsets(group: GroupMetadata,
+                              memberId: String,
+                              generationId: Int,
+                              offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                              responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
+    var delayedOffsetStore: Option[DelayedStore] = None
+
+    group synchronized {
+      if (group.is(Dead)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+      } else if (generationId < 0 && group.is(Empty)) {
+        // the group is only using Kafka to store offsets
+        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback)
+      } else if (group.is(AwaitingSync)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
+      } else if (!group.has(memberId)) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
+      } else if (generationId != group.generationId) {
+        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
+      } else {
+        val member = group.get(memberId)
+        completeAndScheduleNextHeartbeatExpiration(group, member)
+        delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
+          offsetMetadata, responseCallback)
+      }
+    }
+
+    // store the offsets without holding the group lock
+    delayedOffsetStore.foreach(groupManager.store)
+  }
+
+  def handleFetchOffsets(groupId: String,
+                         partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
+    if (!isActive.get)
+      (Errors.COORDINATOR_NOT_AVAILABLE, Map())
+    else if (!isCoordinatorForGroup(groupId)) {
+      debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
+      (Errors.NOT_COORDINATOR, Map())
+    } else if (isCoordinatorLoadInProgress(groupId))
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
+    else {
+      // return offsets blindly regardless the current group state since the group may be using
+      // Kafka commit storage without automatic group management
+      (Errors.NONE, groupManager.getOffsets(groupId, partitions))
+    }
+  }
+
+  def handleListGroups(): (Errors, List[GroupOverview]) = {
+    if (!isActive.get) {
+      (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+    } else {
+      val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
+      (errorCode, groupManager.currentGroups.map(_.overview).toList)
+    }
+  }
+
+  def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
+    if (!isActive.get) {
+      (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+    } else if (!isCoordinatorForGroup(groupId)) {
+      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+    } else {
+      groupManager.getGroup(groupId) match {
+        case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+        case Some(group) =>
+          group synchronized {
+            (Errors.NONE, group.summary)
+          }
+      }
+    }
+  }
+
+  def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
+    groupManager.cleanupGroupMetadata(Some(topicPartitions))
+  }
+
+  private def onGroupUnloaded(group: GroupMetadata) {
+    group synchronized {
+      info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
+      val previousState = group.currentState
+      group.transitionTo(Dead)
+
+      previousState match {
+        case Empty | Dead =>
+        case PreparingRebalance =>
+          for (member <- group.allMemberMetadata) {
+            if (member.awaitingJoinCallback != null) {
+              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
+              member.awaitingJoinCallback = null
+            }
+          }
+          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+
+        case Stable | AwaitingSync =>
+          for (member <- group.allMemberMetadata) {
+            if (member.awaitingSyncCallback != null) {
+              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
+              member.awaitingSyncCallback = null
+            }
+            heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
+          }
+      }
+    }
+  }
+
+  private def onGroupLoaded(group: GroupMetadata) {
+    group synchronized {
+      info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
+      assert(group.is(Stable) || group.is(Empty))
+      group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
+    }
+  }
+
+  def handleGroupImmigration(offsetTopicPartitionId: Int) {
+    groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
+  }
+
+  def handleGroupEmigration(offsetTopicPartitionId: Int) {
+    groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
+  }
+
+  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
+    propagateAssignment(group, Errors.NONE)
+  }
+
+  private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
+    assert(group.is(AwaitingSync))
+    group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
+    propagateAssignment(group, error)
+  }
+
+  private def propagateAssignment(group: GroupMetadata, error: Errors) {
+    for (member <- group.allMemberMetadata) {
+      if (member.awaitingSyncCallback != null) {
+        member.awaitingSyncCallback(member.assignment, error)
+        member.awaitingSyncCallback = null
+
+        // reset the session timeout for members after propagating the member's assignment.
+        // This is because if any member's session expired while we were still awaiting either
+        // the leader sync group or the storage callback, its expiration will be ignored and no
+        // future heartbeat expectations will not be scheduled.
+        completeAndScheduleNextHeartbeatExpiration(group, member)
+      }
+    }
+  }
+
+  private def validGroupId(groupId: String): Boolean = {
+    groupId != null && !groupId.isEmpty
+  }
+
+  private def joinError(memberId: String, error: Errors): JoinGroupResult = {
+    JoinGroupResult(
+      members = Map.empty,
+      memberId = memberId,
+      generationId = 0,
+      subProtocol = GroupCoordinator.NoProtocol,
+      leaderId = GroupCoordinator.NoLeader,
+      error = error)
+  }
+
+  /**
+   * Complete existing DelayedHeartbeats for the given member and schedule the next one
+   */
+  private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
+    // complete current heartbeat expectation
+    member.latestHeartbeat = time.milliseconds()
+    val memberKey = MemberKey(member.groupId, member.memberId)
+    heartbeatPurgatory.checkAndComplete(memberKey)
+
+    // reschedule the next heartbeat expiration deadline
+    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
+    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
+    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
+  }
+
+  private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
+    member.isLeaving = true
+    val memberKey = MemberKey(member.groupId, member.memberId)
+    heartbeatPurgatory.checkAndComplete(memberKey)
+  }
+
+  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
+                                    sessionTimeoutMs: Int,
+                                    clientId: String,
+                                    clientHost: String,
+                                    protocolType: String,
+                                    protocols: List[(String, Array[Byte])],
+                                    group: GroupMetadata,
+                                    callback: JoinCallback) = {
+    // use the client-id with a random id suffix as the member-id
+    val memberId = clientId + "-" + group.generateMemberIdSuffix
+    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
+      sessionTimeoutMs, protocolType, protocols)
+    member.awaitingJoinCallback = callback
+    group.add(member)
+    maybePrepareRebalance(group)
+    member
+  }
+
+  private def updateMemberAndRebalance(group: GroupMetadata,
+                                       member: MemberMetadata,
+                                       protocols: List[(String, Array[Byte])],
+                                       callback: JoinCallback) {
+    member.supportedProtocols = protocols
+    member.awaitingJoinCallback = callback
+    maybePrepareRebalance(group)
+  }
+
+  private def maybePrepareRebalance(group: GroupMetadata) {
+    group synchronized {
+      if (group.canRebalance)
+        prepareRebalance(group)
+    }
+  }
+
+  private def prepareRebalance(group: GroupMetadata) {
+    // if any members are awaiting sync, cancel their request and have them rejoin
+    if (group.is(AwaitingSync))
+      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
+
+    group.transitionTo(PreparingRebalance)
+    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+    val rebalanceTimeout = group.rebalanceTimeoutMs
+    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
+    val groupKey = GroupKey(group.groupId)
+    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
+  }
+
+  private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
+    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+    group.remove(member.memberId)
+    group.currentState match {
+      case Dead | Empty =>
+      case Stable | AwaitingSync => maybePrepareRebalance(group)
+      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+    }
+  }
+
+  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
+    group synchronized {
+      if (group.notYetRejoinedMembers.isEmpty)
+        forceComplete()
+      else false
+    }
+  }
+
+  def onExpireJoin() {
+    // TODO: add metrics for restabilize timeouts
+  }
+
+  def onCompleteJoin(group: GroupMetadata) {
+    var delayedStore: Option[DelayedStore] = None
+    group synchronized {
+      // remove any members who haven't joined the group yet
+      group.notYetRejoinedMembers.foreach { failedMember =>
+        group.remove(failedMember.memberId)
+        // TODO: cut the socket connection to the client
+      }
+
+      if (!group.is(Dead)) {
+        group.initNextGeneration()
+        if (group.is(Empty)) {
+          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+
+          delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
+            if (error != Errors.NONE) {
+              // we failed to write the empty group metadata. If the broker fails before another rebalance,
+              // the previous generation written to the log will become active again (and most likely timeout).
+              // This should be safe since there are no active members in an empty generation, so we just warn.
+              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
+            }
+          })
+        } else {
+          info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+
+          // trigger the awaiting join group response callback for all the members after rebalancing
+          for (member <- group.allMemberMetadata) {
+            assert(member.awaitingJoinCallback != null)
+            val joinResult = JoinGroupResult(
+              members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+              memberId = member.memberId,
+              generationId = group.generationId,
+              subProtocol = group.protocol,
+              leaderId = group.leaderId,
+              error = Errors.NONE)
+
+            member.awaitingJoinCallback(joinResult)
+            member.awaitingJoinCallback = null
+            completeAndScheduleNextHeartbeatExpiration(group, member)
+          }
+        }
+      }
+    }
+
+    // call without holding the group lock
+    delayedStore.foreach(groupManager.store)
+  }
+
+  def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+    group synchronized {
+      if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+        forceComplete()
+      else false
+    }
+  }
+
+  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
+    group synchronized {
+      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
+        onMemberFailure(group, member)
+    }
+  }
+
+  def onCompleteHeartbeat() {
+    // TODO: add metrics for complete heartbeats
+  }
+
+  def partitionFor(group: String): Int = groupManager.partitionFor(group)
+
+  private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
+    member.awaitingJoinCallback != null ||
+      member.awaitingSyncCallback != null ||
+      member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
+
+  private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
+
+  private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
+}
+
+object GroupCoordinator {
+
+  val NoState = ""
+  val NoProtocolType = ""
+  val NoProtocol = ""
+  val NoLeader = ""
+  val NoMembers = List[MemberSummary]()
+  val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
+  val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
+
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            replicaManager: ReplicaManager,
+            time: Time): GroupCoordinator = {
+    val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
+    val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
+    apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+  }
+
+  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
+    maxMetadataSize = config.offsetMetadataMaxSize,
+    loadBufferSize = config.offsetsLoadBufferSize,
+    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
+    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
+    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
+    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
+  )
+
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            replicaManager: ReplicaManager,
+            heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+            joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+            time: Time): GroupCoordinator = {
+    val offsetConfig = this.offsetConfig(config)
+    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+      groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+    val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
+      offsetConfig, replicaManager, zkUtils, time)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
+  }
+
+}
+
+case class GroupConfig(groupMinSessionTimeoutMs: Int,
+                       groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+                           memberId: String,
+                           generationId: Int,
+                           subProtocol: String,
+                           leaderId: String,
+                           error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
new file mode 100644
index 0000000..b433284
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import java.util.UUID
+
+import kafka.common.OffsetAndMetadata
+import kafka.utils.nonthreadsafe
+import org.apache.kafka.common.TopicPartition
+
+import scala.collection.{Seq, immutable, mutable}
+
+private[group] sealed trait GroupState { def state: Byte }
+
+/**
+ * Group is preparing to rebalance
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ *         respond to sync group with REBALANCE_IN_PROGRESS
+ *         remove member on leave group request
+ *         park join group requests from new or existing members until all expected members have joined
+ *         allow offset commits from previous generation
+ *         allow offset fetch requests
+ * transition: some members have joined by the timeout => AwaitingSync
+ *             all members have left the group => Empty
+ *             group is removed by partition emigration => Dead
+ */
+private[group] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
+
+/**
+ * Group is awaiting state assignment from the leader
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ *         respond to offset commits with REBALANCE_IN_PROGRESS
+ *         park sync group requests from followers until transition to Stable
+ *         allow offset fetch requests
+ * transition: sync group with state assignment received from leader => Stable
+ *             join group from new member or existing member with updated metadata => PreparingRebalance
+ *             leave group from existing member => PreparingRebalance
+ *             member failure detected => PreparingRebalance
+ *             group is removed by partition emigration => Dead
+ */
+private[group] case object AwaitingSync extends GroupState { val state: Byte = 5}
+
+/**
+ * Group is stable
+ *
+ * action: respond to member heartbeats normally
+ *         respond to sync group from any member with current assignment
+ *         respond to join group from followers with matching metadata with current group metadata
+ *         allow offset commits from member of current generation
+ *         allow offset fetch requests
+ * transition: member failure detected via heartbeat => PreparingRebalance
+ *             leave group from existing member => PreparingRebalance
+ *             leader join-group received => PreparingRebalance
+ *             follower join-group with new metadata => PreparingRebalance
+ *             group is removed by partition emigration => Dead
+ */
+private[group] case object Stable extends GroupState { val state: Byte = 3 }
+
+/**
+ * Group has no more members and its metadata is being removed
+ *
+ * action: respond to join group with UNKNOWN_MEMBER_ID
+ *         respond to sync group with UNKNOWN_MEMBER_ID
+ *         respond to heartbeat with UNKNOWN_MEMBER_ID
+ *         respond to leave group with UNKNOWN_MEMBER_ID
+ *         respond to offset commit with UNKNOWN_MEMBER_ID
+ *         allow offset fetch requests
+ * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
+ */
+private[group] case object Dead extends GroupState { val state: Byte = 4 }
+
+/**
+  * Group has no more members, but lingers until all offsets have expired. This state
+  * also represents groups which use Kafka only for offset commits and have no members.
+  *
+  * action: respond normally to join group from new members
+  *         respond to sync group with UNKNOWN_MEMBER_ID
+  *         respond to heartbeat with UNKNOWN_MEMBER_ID
+  *         respond to leave group with UNKNOWN_MEMBER_ID
+  *         respond to offset commit with UNKNOWN_MEMBER_ID
+  *         allow offset fetch requests
+  * transition: last offsets removed in periodic expiration task => Dead
+  *             join group from a new member => PreparingRebalance
+  *             group is removed by partition emigration => Dead
+  *             group is removed by expiration => Dead
+  */
+private[group] case object Empty extends GroupState { val state: Byte = 5 }
+
+
+private object GroupMetadata {
+  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
+      AwaitingSync -> Set(PreparingRebalance),
+      Stable -> Set(AwaitingSync),
+      PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
+      Empty -> Set(PreparingRebalance))
+}
+
+/**
+ * Case class used to represent group metadata for the ListGroups API
+ */
+case class GroupOverview(groupId: String,
+                         protocolType: String)
+
+/**
+ * Case class used to represent group metadata for the DescribeGroup API
+ */
+case class GroupSummary(state: String,
+                        protocolType: String,
+                        protocol: String,
+                        members: List[MemberSummary])
+
+/**
+ * Group contains the following metadata:
+ *
+ *  Membership metadata:
+ *  1. Members registered in this group
+ *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
+ *  3. Protocol metadata associated with group members
+ *
+ *  State metadata:
+ *  1. group state
+ *  2. generation id
+ *  3. leader id
+ */
+@nonthreadsafe
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
+
+  private var state: GroupState = initialState
+  private val members = new mutable.HashMap[String, MemberMetadata]
+  private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+  private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+
+  var protocolType: Option[String] = None
+  var generationId = 0
+  var leaderId: String = null
+  var protocol: String = null
+
+  def is(groupState: GroupState) = state == groupState
+  def not(groupState: GroupState) = state != groupState
+  def has(memberId: String) = members.contains(memberId)
+  def get(memberId: String) = members(memberId)
+
+  def add(member: MemberMetadata) {
+    if (members.isEmpty)
+      this.protocolType = Some(member.protocolType)
+
+    assert(groupId == member.groupId)
+    assert(this.protocolType.orNull == member.protocolType)
+    assert(supportsProtocols(member.protocols))
+
+    if (leaderId == null)
+      leaderId = member.memberId
+    members.put(member.memberId, member)
+  }
+
+  def remove(memberId: String) {
+    members.remove(memberId)
+    if (memberId == leaderId) {
+      leaderId = if (members.isEmpty) {
+        null
+      } else {
+        members.keys.head
+      }
+    }
+  }
+
+  def currentState = state
+
+  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
+
+  def allMembers = members.keySet
+
+  def allMemberMetadata = members.values.toList
+
+  def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
+    timeout.max(member.rebalanceTimeoutMs)
+  }
+
+  // TODO: decide if ids should be predictable or random
+  def generateMemberIdSuffix = UUID.randomUUID().toString
+
+  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
+
+  def transitionTo(groupState: GroupState) {
+    assertValidTransition(groupState)
+    state = groupState
+  }
+
+  def selectProtocol: String = {
+    if (members.isEmpty)
+      throw new IllegalStateException("Cannot select protocol for empty group")
+
+    // select the protocol for this group which is supported by all members
+    val candidates = candidateProtocols
+
+    // let each member vote for one of the protocols and choose the one with the most votes
+    val votes: List[(String, Int)] = allMemberMetadata
+      .map(_.vote(candidates))
+      .groupBy(identity)
+      .mapValues(_.size)
+      .toList
+
+    votes.maxBy(_._2)._1
+  }
+
+  private def candidateProtocols = {
+    // get the set of protocols that are commonly supported by all members
+    allMemberMetadata
+      .map(_.protocols)
+      .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+  }
+
+  def supportsProtocols(memberProtocols: Set[String]) = {
+    members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+  }
+
+  def initNextGeneration() = {
+    assert(notYetRejoinedMembers == List.empty[MemberMetadata])
+    if (members.nonEmpty) {
+      generationId += 1
+      protocol = selectProtocol
+      transitionTo(AwaitingSync)
+    } else {
+      generationId += 1
+      protocol = null
+      transitionTo(Empty)
+    }
+  }
+
+  def currentMemberMetadata: Map[String, Array[Byte]] = {
+    if (is(Dead) || is(PreparingRebalance))
+      throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
+    members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+  }
+
+  def summary: GroupSummary = {
+    if (is(Stable)) {
+      val members = this.members.values.map { member => member.summary(protocol) }.toList
+      GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
+    } else {
+      val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
+      GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
+    }
+  }
+
+  def overview: GroupOverview = {
+    GroupOverview(groupId, protocolType.getOrElse(""))
+  }
+
+  def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata]) {
+     this.offsets ++= offsets
+  }
+
+  def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
+    if (pendingOffsetCommits.contains(topicPartition))
+      offsets.put(topicPartition, offset)
+
+    pendingOffsetCommits.get(topicPartition) match {
+      case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
+      case _ =>
+    }
+  }
+
+  def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
+    pendingOffsetCommits.get(topicPartition) match {
+      case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
+      case _ =>
+    }
+  }
+
+  def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
+    pendingOffsetCommits ++= offsets
+  }
+
+  def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
+    topicPartitions.flatMap { topicPartition =>
+      pendingOffsetCommits.remove(topicPartition)
+      val removedOffset = offsets.remove(topicPartition)
+      removedOffset.map(topicPartition -> _)
+    }.toMap
+  }
+
+  def removeExpiredOffsets(startMs: Long) = {
+    val expiredOffsets = offsets.filter {
+      case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
+    }
+    offsets --= expiredOffsets.keySet
+    expiredOffsets.toMap
+  }
+
+  def allOffsets = offsets.toMap
+
+  def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
+
+  def numOffsets = offsets.size
+
+  def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
+
+  private def assertValidTransition(targetState: GroupState) {
+    if (!GroupMetadata.validPreviousStates(targetState).contains(state))
+      throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
+        .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
+  }
+
+  override def toString: String = {
+    "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
+  }
+}
+