You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 02:47:25 UTC
[2/2] kafka git commit: KAFKA-2720: expire group metadata when all
offsets have expired
KAFKA-2720: expire group metadata when all offsets have expired
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Liquan Pei, Onur Karaman, Guozhang Wang
Closes #1427 from hachikuji/KAFKA-2720
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c551675
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c551675
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c551675
Branch: refs/heads/trunk
Commit: 8c551675adb11947e9f27b20a9195c9c4a20b432
Parents: fb42558
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jun 15 19:46:42 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 15 19:46:42 2016 -0700
----------------------------------------------------------------------
.../scala/kafka/coordinator/DelayedJoin.scala | 4 +-
.../kafka/coordinator/GroupCoordinator.scala | 304 +++++------
.../scala/kafka/coordinator/GroupMetadata.scala | 110 +++-
.../coordinator/GroupMetadataManager.scala | 498 ++++++++++---------
.../kafka/coordinator/MemberMetadata.scala | 1 +
.../GroupCoordinatorResponseTest.scala | 7 +-
.../coordinator/GroupMetadataManagerTest.scala | 407 +++++++++++++++
.../kafka/coordinator/GroupMetadataTest.scala | 155 +++++-
.../kafka/coordinator/MemberMetadataTest.scala | 11 +-
9 files changed, 1081 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
index ae96e15..a62884a 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -30,8 +30,8 @@ import kafka.server.DelayedOperation
* the rest of the group.
*/
private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
- group: GroupMetadata,
- sessionTimeout: Long)
+ group: GroupMetadata,
+ sessionTimeout: Long)
extends DelayedOperation(sessionTimeout) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index e9bbbd3..9c75f83 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -31,15 +31,6 @@ import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
import scala.collection.{Map, Seq, immutable}
-case class GroupConfig(groupMinSessionTimeoutMs: Int,
- groupMaxSessionTimeoutMs: Int)
-
-case class JoinGroupResult(members: Map[String, Array[Byte]],
- memberId: String,
- generationId: Int,
- subProtocol: String,
- leaderId: String,
- errorCode: Short)
/**
* GroupCoordinator handles general group membership and offset management.
@@ -77,8 +68,10 @@ class GroupCoordinator(val brokerId: Int,
/**
* Startup logic executed at the same time when the server starts up.
*/
- def startup() {
+ def startup(enableMetadataExpiration: Boolean = true) {
info("Starting up.")
+ if (enableMetadataExpiration)
+ groupManager.enableMetadataExpiration()
isActive.set(true)
info("Startup complete.")
}
@@ -119,16 +112,17 @@ class GroupCoordinator(val brokerId: Int,
// only try to create the group if the group is not unknown AND
// the member id is UNKNOWN, if member is specified but group does not
// exist we should reject the request
- var group = groupManager.getGroup(groupId)
- if (group == null) {
- if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
- } else {
- group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
+ groupManager.getGroup(groupId) match {
+ case None =>
+ if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+ } else {
+ val group = groupManager.addGroup(new GroupMetadata(groupId))
+ doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ }
+
+ case Some(group) =>
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
- }
- } else {
- doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
}
}
@@ -142,7 +136,7 @@ class GroupCoordinator(val brokerId: Int,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group synchronized {
- if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
+ if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
@@ -160,7 +154,7 @@ class GroupCoordinator(val brokerId: Int,
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -168,7 +162,7 @@ class GroupCoordinator(val brokerId: Int,
case AwaitingSync =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (member.matches(protocols)) {
@@ -192,10 +186,10 @@ class GroupCoordinator(val brokerId: Int,
}
}
- case Stable =>
+ case Empty | Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
- addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
+ addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -233,11 +227,10 @@ class GroupCoordinator(val brokerId: Int,
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else {
- val group = groupManager.getGroup(groupId)
- if (group == null)
- responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
- else
- doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+ groupManager.getGroup(groupId) match {
+ case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+ case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+ }
}
}
@@ -255,7 +248,7 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
} else {
group.currentState match {
- case Dead =>
+ case Empty | Dead =>
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
case PreparingRebalance =>
@@ -301,7 +294,8 @@ class GroupCoordinator(val brokerId: Int,
}
// store the group metadata without holding the group lock to avoid the potential
- // for deadlock when the callback is invoked
+ // for deadlock if the callback is invoked holding other locks (e.g. the replica
+ // state change lock)
delayedGroupStore.foreach(groupManager.store)
}
@@ -313,26 +307,25 @@ class GroupCoordinator(val brokerId: Int,
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
} else {
- val group = groupManager.getGroup(groupId)
- if (group == null) {
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else {
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else if (!group.has(consumerId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else {
- val member = group.get(consumerId)
- removeHeartbeatForLeavingMember(group, member)
- onMemberFailure(group, member)
- responseCallback(Errors.NONE.code)
+ groupManager.getGroup(groupId) match {
+ case None =>
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+ // joining without specified consumer id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+ case Some(group) =>
+ group synchronized {
+ if (group.is(Dead) || !group.has(consumerId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else {
+ val member = group.get(consumerId)
+ removeHeartbeatForLeavingMember(group, member)
+ onMemberFailure(group, member)
+ responseCallback(Errors.NONE.code)
+ }
}
- }
}
}
}
@@ -349,29 +342,30 @@ class GroupCoordinator(val brokerId: Int,
// the group is still loading, so respond just blindly
responseCallback(Errors.NONE.code)
} else {
- val group = groupManager.getGroup(groupId)
- if (group == null) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else {
- group synchronized {
- if (group.is(Dead)) {
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id,
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else if (!group.is(Stable)) {
- responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
- } else if (!group.has(memberId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
- } else if (generationId != group.generationId) {
- responseCallback(Errors.ILLEGAL_GENERATION.code)
- } else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- responseCallback(Errors.NONE.code)
+ groupManager.getGroup(groupId) match {
+ case None => responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ case Some(group) =>
+ group synchronized {
+ if (group.is(Empty)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (group.is(Dead)) {
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the member retry
+ // joining without the specified member id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (!group.is(Stable)) {
+ responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+ } else if (!group.has(memberId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (generationId != group.generationId) {
+ responseCallback(Errors.ILLEGAL_GENERATION.code)
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ responseCallback(Errors.NONE.code)
+ }
}
- }
}
}
}
@@ -381,8 +375,6 @@ class GroupCoordinator(val brokerId: Int,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
- var delayedOffsetStore: Option[DelayedStore] = None
-
if (!isActive.get) {
responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
} else if (!isCoordinatorForGroup(groupId)) {
@@ -390,33 +382,48 @@ class GroupCoordinator(val brokerId: Int,
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
} else {
- val group = groupManager.getGroup(groupId)
- if (group == null) {
- if (generationId < 0)
- // the group is not relying on Kafka for partition management, so allow the commit
- delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata,
- responseCallback))
- else
- // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
- // or this is a request coming from an older generation. either way, reject the commit
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
- } else {
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
- } else if (group.is(AwaitingSync)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
- } else if (!group.has(memberId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
- } else if (generationId != group.generationId) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+ groupManager.getGroup(groupId) match {
+ case None =>
+ if (generationId < 0) {
+ // the group is not relying on Kafka for group management, so allow the commit
+ val group = groupManager.addGroup(new GroupMetadata(groupId))
+ doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
} else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
- offsetMetadata, responseCallback))
+ // or this is a request coming from an older generation. either way, reject the commit
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
}
- }
+
+ case Some(group) =>
+ doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
+ }
+ }
+ }
+
+ def doCommitOffsets(group: GroupMetadata,
+ memberId: String,
+ generationId: Int,
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
+ var delayedOffsetStore: Option[DelayedStore] = None
+
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+ } else if (generationId < 0 && group.is(Empty)) {
+ // the group is only using Kafka to store offsets
+ delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback))
+ } else if (group.is(AwaitingSync)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
+ } else if (!group.has(memberId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+ } else if (generationId != group.generationId) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ delayedOffsetStore = Some(groupManager.prepareStoreOffsets(group, memberId, generationId,
+ offsetMetadata, responseCallback))
}
}
@@ -424,12 +431,14 @@ class GroupCoordinator(val brokerId: Int,
delayedOffsetStore.foreach(groupManager.store)
}
+
def handleFetchOffsets(groupId: String,
partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (!isActive.get) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
+ debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
} else if (isCoordinatorLoadingInProgress(groupId)) {
@@ -459,13 +468,12 @@ class GroupCoordinator(val brokerId: Int,
} else if (isCoordinatorLoadingInProgress(groupId)) {
(Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
} else {
- val group = groupManager.getGroup(groupId)
- if (group == null) {
- (Errors.NONE, GroupCoordinator.DeadGroup)
- } else {
- group synchronized {
- (Errors.NONE, group.summary)
- }
+ groupManager.getGroup(groupId) match {
+ case None => (Errors.NONE, GroupCoordinator.DeadGroup)
+ case Some(group) =>
+ group synchronized {
+ (Errors.NONE, group.summary)
+ }
}
}
}
@@ -477,7 +485,7 @@ class GroupCoordinator(val brokerId: Int,
group.transitionTo(Dead)
previousState match {
- case Dead =>
+ case Empty | Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
if (member.awaitingJoinCallback != null) {
@@ -502,7 +510,7 @@ class GroupCoordinator(val brokerId: Int,
private def onGroupLoaded(group: GroupMetadata) {
group synchronized {
info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
- assert(group.is(Stable))
+ assert(group.is(Stable) || group.is(Empty))
group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
}
}
@@ -580,12 +588,13 @@ class GroupCoordinator(val brokerId: Int,
private def addMemberAndRebalance(sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
+ protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback) = {
// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix
- val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols)
+ val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols)
member.awaitingJoinCallback = callback
group.add(member.memberId, member)
maybePrepareRebalance(group)
@@ -626,7 +635,7 @@ class GroupCoordinator(val brokerId: Int,
trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
group.remove(member.memberId)
group.currentState match {
- case Dead =>
+ case Dead | Empty =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
@@ -645,42 +654,49 @@ class GroupCoordinator(val brokerId: Int,
}
def onCompleteJoin(group: GroupMetadata) {
+ var delayedStore: Option[DelayedStore] = None
group synchronized {
- val failedMembers = group.notYetRejoinedMembers
- if (group.isEmpty || failedMembers.nonEmpty) {
- failedMembers.foreach { failedMember =>
- group.remove(failedMember.memberId)
- // TODO: cut the socket connection to the client
- }
-
- // TODO KAFKA-2720: only remove group in the background thread
- if (group.isEmpty) {
- group.transitionTo(Dead)
- groupManager.removeGroup(group)
- info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
- }
+ // remove any members who haven't joined the group yet
+ group.notYetRejoinedMembers.foreach { failedMember =>
+ group.remove(failedMember.memberId)
+ // TODO: cut the socket connection to the client
}
+
if (!group.is(Dead)) {
group.initNextGeneration()
- info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
-
- // trigger the awaiting join group response callback for all the members after rebalancing
- for (member <- group.allMemberMetadata) {
- assert(member.awaitingJoinCallback != null)
- val joinResult = JoinGroupResult(
- members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
- memberId=member.memberId,
- generationId=group.generationId,
- subProtocol=group.protocol,
- leaderId=group.leaderId,
- errorCode=Errors.NONE.code)
-
- member.awaitingJoinCallback(joinResult)
- member.awaitingJoinCallback = null
- completeAndScheduleNextHeartbeatExpiration(group, member)
+ if (group.is(Empty)) {
+ info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
+ delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode => {
+ if (errorCode != Errors.NONE.code) {
+ // we failed to persist the empty group. if we don't retry (which is how
+ // we handle the situation when a normal rebalance fails, then a coordinator
+ // change will cause the old generation to come back to life.
+ }
+ }))
+ } else {
+ info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
+
+ // trigger the awaiting join group response callback for all the members after rebalancing
+ for (member <- group.allMemberMetadata) {
+ assert(member.awaitingJoinCallback != null)
+ val joinResult = JoinGroupResult(
+ members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+ memberId=member.memberId,
+ generationId=group.generationId,
+ subProtocol=group.protocol,
+ leaderId=group.leaderId,
+ errorCode=Errors.NONE.code)
+
+ member.awaitingJoinCallback(joinResult)
+ member.awaitingJoinCallback = null
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ }
}
}
}
+
+ // call without holding the group lock
+ delayedStore.foreach(groupManager.store)
}
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
@@ -757,3 +773,13 @@ object GroupCoordinator {
}
}
+
+case class GroupConfig(groupMinSessionTimeoutMs: Int,
+ groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+ memberId: String,
+ generationId: Int,
+ subProtocol: String,
+ leaderId: String,
+ errorCode: Short)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 4fa656e..b455964 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -18,10 +18,10 @@
package kafka.coordinator
import kafka.utils.nonthreadsafe
-
import java.util.UUID
-import org.apache.kafka.common.protocol.Errors
+import kafka.common.OffsetAndMetadata
+import org.apache.kafka.common.TopicPartition
import collection.mutable
@@ -37,7 +37,8 @@ private[coordinator] sealed trait GroupState { def state: Byte }
* allow offset commits from previous generation
* allow offset fetch requests
* transition: some members have joined by the timeout => AwaitingSync
- * all members have left the group => Dead
+ * all members have left the group => Empty
+ * group is removed by partition emigration => Dead
*/
private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
@@ -52,6 +53,7 @@ private[coordinator] case object PreparingRebalance extends GroupState { val sta
* join group from new member or existing member with updated metadata => PreparingRebalance
* leave group from existing member => PreparingRebalance
* member failure detected => PreparingRebalance
+ * group is removed by partition emigration => Dead
*/
private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
@@ -67,11 +69,12 @@ private[coordinator] case object AwaitingSync extends GroupState { val state: By
* leave group from existing member => PreparingRebalance
* leader join-group received => PreparingRebalance
* follower join-group with new metadata => PreparingRebalance
+ * group is removed by partition emigration => Dead
*/
private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
/**
- * Group has no more members
+ * Group has no more members and its metadata is being removed
*
* action: respond to join group with UNKNOWN_MEMBER_ID
* respond to sync group with UNKNOWN_MEMBER_ID
@@ -83,13 +86,31 @@ private[coordinator] case object Stable extends GroupState { val state: Byte = 3
*/
private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
+/**
+ * Group has no more members, but lingers until all offsets have expired. This state
+ * also represents groups which use Kafka only for offset commits and have no members.
+ *
+ * action: respond normally to join group from new members
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: last offsets removed in periodic expiration task => Dead
+ * join group from a new member => PreparingRebalance
+ * group is removed by partition emigration => Dead
+ * group is removed by expiration => Dead
+ */
+private[coordinator] case object Empty extends GroupState { val state: Byte = 5 }
+
private object GroupMetadata {
private val validPreviousStates: Map[GroupState, Set[GroupState]] =
- Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync),
+ Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
AwaitingSync -> Set(PreparingRebalance),
Stable -> Set(AwaitingSync),
- PreparingRebalance -> Set(Stable, AwaitingSync))
+ PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
+ Empty -> Set(PreparingRebalance))
}
/**
@@ -120,10 +141,14 @@ case class GroupSummary(state: String,
* 3. leader id
*/
@nonthreadsafe
-private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
+private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
+ private var state: GroupState = initialState
private val members = new mutable.HashMap[String, MemberMetadata]
- private var state: GroupState = Stable
+ private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+ private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+
+ var protocolType: Option[String] = None
var generationId = 0
var leaderId: String = null
var protocol: String = null
@@ -134,6 +159,11 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
def get(memberId: String) = members(memberId)
def add(memberId: String, member: MemberMetadata) {
+ if (members.isEmpty)
+ this.protocolType = Some(member.protocolType)
+
+ assert(groupId == member.groupId)
+ assert(this.protocolType.orNull == member.protocolType)
assert(supportsProtocols(member.protocols))
if (leaderId == null)
@@ -154,8 +184,6 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
def currentState = state
- def isEmpty = members.isEmpty
-
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
def allMembers = members.keySet
@@ -169,7 +197,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString
- def canRebalance = state == Stable || state == AwaitingSync
+ def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
def transitionTo(groupState: GroupState) {
assertValidTransition(groupState)
@@ -201,14 +229,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
}
def supportsProtocols(memberProtocols: Set[String]) = {
- isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+ members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
}
def initNextGeneration() = {
assert(notYetRejoinedMembers == List.empty[MemberMetadata])
- generationId += 1
- protocol = selectProtocol
- transitionTo(AwaitingSync)
+ if (members.nonEmpty) {
+ generationId += 1
+ protocol = selectProtocol
+ transitionTo(AwaitingSync)
+ } else {
+ generationId += 1
+ protocol = null
+ transitionTo(Empty)
+ }
}
def currentMemberMetadata: Map[String, Array[Byte]] = {
@@ -219,18 +253,53 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
def summary: GroupSummary = {
if (is(Stable)) {
- val members = this.members.values.map{ member => member.summary(protocol) }.toList
- GroupSummary(state.toString, protocolType, protocol, members)
+ val members = this.members.values.map { member => member.summary(protocol) }.toList
+ GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
} else {
val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
- GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members)
+ GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
}
}
def overview: GroupOverview = {
- GroupOverview(groupId, protocolType)
+ GroupOverview(groupId, protocolType.getOrElse(""))
+ }
+
+ def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
+ offsets.put(topicPartition, offset)
+ pendingOffsetCommits.get(topicPartition) match {
+ case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
+ case _ =>
+ }
+ }
+
+ def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
+ pendingOffsetCommits.get(topicPartition) match {
+ case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
+ case _ =>
+ }
+ }
+
+ def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
+ pendingOffsetCommits ++= offsets
+ }
+
+ def removeExpiredOffsets(startMs: Long) = {
+ val expiredOffsets = offsets.filter {
+ case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
+ }
+ offsets --= expiredOffsets.keySet
+ expiredOffsets
}
+ def allOffsets = offsets.toMap
+
+ def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
+
+ def numOffsets = offsets.size
+
+ def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
+
private def assertValidTransition(targetState: GroupState) {
if (!GroupMetadata.validPreviousStates(targetState).contains(state))
throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
@@ -240,4 +309,5 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
override def toString = {
"[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
}
-}
\ No newline at end of file
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index b968f97..915c360 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -17,12 +17,11 @@
package kafka.coordinator
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.utils.CoreUtils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING
import org.apache.kafka.common.protocol.types.Type.INT32
import org.apache.kafka.common.protocol.types.Type.INT64
import org.apache.kafka.common.protocol.types.Type.BYTES
@@ -39,16 +38,18 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.common.MessageFormatter
import kafka.server.ReplicaManager
+
import scala.collection._
import java.io.PrintStream
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
import com.yammer.metrics.core.Gauge
+import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.internals.TopicConstants
-case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
- callback: Map[TopicPartition, PartitionResponse] => Unit)
class GroupMetadataManager(val brokerId: Int,
val config: OffsetConfig,
@@ -56,72 +57,77 @@ class GroupMetadataManager(val brokerId: Int,
zkUtils: ZkUtils,
time: Time) extends Logging with KafkaMetricsGroup {
- /* offsets cache */
- private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
+ private val groupMetadataCache = new Pool[String, GroupMetadata]
- /* group metadata cache */
- private val groupsCache = new Pool[String, GroupMetadata]
+ /* lock protecting access to loading and owned partition sets */
+ private val partitionLock = new ReentrantLock()
- /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE offsetExpireLock and the group lock if needed */
+ /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
private val loadingPartitions: mutable.Set[Int] = mutable.Set()
/* partitions of consumer groups that are assigned, using the same loading partition lock */
private val ownedPartitions: mutable.Set[Int] = mutable.Set()
- /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
- private val offsetExpireLock = new ReentrantReadWriteLock()
-
/* shutting down flag */
private val shuttingDown = new AtomicBoolean(false)
/* number of partitions for the consumer metadata topic */
private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
- /* Single-thread scheduler to handling offset/group metadata cache loading and unloading */
+ /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
- scheduler.startup()
- scheduler.schedule(name = "delete-expired-consumer-offsets",
- fun = deleteExpiredOffsets,
- period = config.offsetsRetentionCheckIntervalMs,
- unit = TimeUnit.MILLISECONDS)
-
newGauge("NumOffsets",
new Gauge[Int] {
- def value = offsetsCache.size
+ def value = groupMetadataCache.values.map(group => {
+ group synchronized { group.numOffsets }
+ }).sum
}
)
newGauge("NumGroups",
new Gauge[Int] {
- def value = groupsCache.size
+ def value = groupMetadataCache.size
}
)
- def currentGroups(): Iterable[GroupMetadata] = groupsCache.values
+ def enableMetadataExpiration() {
+ scheduler.startup()
+
+ scheduler.schedule(name = "delete-expired-group-metadata",
+ fun = cleanupGroupMetadata,
+ period = config.offsetsRetentionCheckIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
+ }
+
+ def currentGroups(): Iterable[GroupMetadata] = groupMetadataCache.values
+
+ def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) }
+
+ def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) }
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
- def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
+ def isGroupLocal(groupId: String): Boolean = isPartitionOwned(partitionFor(groupId))
- def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
+ def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
- def isLoading(): Boolean = loadingPartitions synchronized loadingPartitions.nonEmpty
+ def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
/**
* Get the group associated with the given groupId, or null if not found
*/
- def getGroup(groupId: String): GroupMetadata = {
- groupsCache.get(groupId)
+ def getGroup(groupId: String): Option[GroupMetadata] = {
+ Option(groupMetadataCache.get(groupId))
}
/**
* Add a group or get the group associated with the given groupId if it already exists
*/
def addGroup(group: GroupMetadata): GroupMetadata = {
- val currentGroup = groupsCache.putIfNotExists(group.groupId, group)
+ val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
if (currentGroup != null) {
currentGroup
} else {
@@ -130,13 +136,15 @@ class GroupMetadataManager(val brokerId: Int,
}
/**
- * Remove all metadata associated with the group
- * @param group
+ * Remove the group from the cache and delete all metadata associated with it. This should be
+ * called only after all offsets for the group have expired and no members are remaining (i.e.
+ * it is in the Empty state).
*/
- def removeGroup(group: GroupMetadata) {
+ private def evictGroupAndDeleteMetadata(group: GroupMetadata) {
// guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
- // while the group is being removed due to coordinator emigration)
- if (groupsCache.remove(group.groupId, group)) {
+ // while the group is being removed due to coordinator emigration). We also avoid writing the tombstone
+ // when the generationId is 0, since this group is only using Kafka for offset storage.
+ if (groupMetadataCache.remove(group.groupId, group) && group.generationId > 0) {
// Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
// retry removing this group.
@@ -154,7 +162,6 @@ class GroupMetadataManager(val brokerId: Int,
try {
// do not need to require acks since even if the tombstone is lost,
// it will be appended again by the new leader
- // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
} catch {
case t: Throwable =>
@@ -227,20 +234,20 @@ class GroupMetadataManager(val brokerId: Int,
DelayedStore(groupMetadataMessageSet, putCacheCallback)
}
- def store(delayedAppend: DelayedStore) {
+ def store(delayedStore: DelayedStore) {
// call replica manager to append the group message
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
- delayedAppend.messageSet,
- delayedAppend.callback)
+ delayedStore.messageSet,
+ delayedStore.callback)
}
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/
- def prepareStoreOffsets(groupId: String,
+ def prepareStoreOffsets(group: GroupMetadata,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
@@ -252,16 +259,16 @@ class GroupMetadataManager(val brokerId: Int,
// construct the message set to append
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId))
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
new Message(
- key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
+ key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
timestamp = timestamp,
magicValue = magicValue
)
}.toSeq
- val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId))
+ val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -278,29 +285,38 @@ class GroupMetadataManager(val brokerId: Int,
val status = responseStatus(offsetTopicPartition)
val responseCode =
- if (status.errorCode == Errors.NONE.code) {
- filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
- putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
+ group synchronized {
+ if (status.errorCode == Errors.NONE.code) {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ }
+ }
+ Errors.NONE.code
+ } else {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+ group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
+ }
+ }
+
+ debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
+ .format(filteredOffsetMetadata, group.groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
+
+ // transform the log append error code to the corresponding the commit status error code
+ if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
+ else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
+ Errors.NOT_COORDINATOR_FOR_GROUP.code
+ else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
+ || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
+ || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
+ Errors.INVALID_COMMIT_OFFSET_SIZE.code
+ else
+ status.errorCode
}
- Errors.NONE.code
- } else {
- debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
- .format(filteredOffsetMetadata, groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
-
- // transform the log append error code to the corresponding the commit status error code
- if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
- else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
- Errors.NOT_COORDINATOR_FOR_GROUP.code
- else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
- || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
- || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
- Errors.INVALID_COMMIT_OFFSET_SIZE.code
- else
- status.errorCode
}
-
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
@@ -313,6 +329,10 @@ class GroupMetadataManager(val brokerId: Int,
responseCallback(commitStatus)
}
+ group synchronized {
+ group.prepareOffsetCommit(offsetMetadata)
+ }
+
DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}
@@ -320,27 +340,36 @@ class GroupMetadataManager(val brokerId: Int,
* The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log (and returns an error code).
*/
- def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
- trace("Getting offsets %s for group %s.".format(topicPartitions, group))
-
- if (isGroupLocal(group)) {
- if (topicPartitions.isEmpty) {
- // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
- offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
- (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
- }.toMap
- } else {
- topicPartitions.map { topicPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicPartition)
- (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
- }.toMap
- }
- } else {
- debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
+ def getOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
+ trace("Getting offsets %s for group %s.".format(topicPartitions, groupId))
+ val group = groupMetadataCache.get(groupId)
+ if (group == null) {
topicPartitions.map { topicPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicPartition)
- (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
}.toMap
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ topicPartitions.map { topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+ }.toMap
+ } else {
+ if (topicPartitions.isEmpty) {
+ // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
+ group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+ }
+ } else {
+ topicPartitions.map { topicPartition =>
+ group.offset(topicPartition) match {
+ case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
+ case Some(offsetAndMetadata) =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+ }
+ }.toMap
+ }
+ }
+ }
}
}
@@ -355,7 +384,7 @@ class GroupMetadataManager(val brokerId: Int,
def loadGroupsAndOffsets() {
info("Loading offsets and group metadata from " + topicPartition)
- loadingPartitions synchronized {
+ inLock(partitionLock) {
if (loadingPartitions.contains(offsetsPartition)) {
info("Offset load from %s already in progress.".format(topicPartition))
return
@@ -371,74 +400,70 @@ class GroupMetadataManager(val brokerId: Int,
var currOffset = log.logSegments.head.baseOffset
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
- inWriteLock(offsetExpireLock) {
- val loadedGroups = mutable.Map[String, GroupMetadata]()
- val removedGroups = mutable.Set[String]()
-
- while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
- buffer.clear()
- val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
- messages.readInto(buffer, 0)
- val messageSet = new ByteBufferMessageSet(buffer)
- messageSet.foreach { msgAndOffset =>
- require(msgAndOffset.message.key != null, "Offset entry key should not be null")
- val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
-
- if (baseKey.isInstanceOf[OffsetKey]) {
- // load offset
- val key = baseKey.key.asInstanceOf[GroupTopicPartition]
- if (msgAndOffset.message.payload == null) {
- if (offsetsCache.remove(key) != null)
- trace("Removed offset for %s due to tombstone entry.".format(key))
- else
- trace("Ignoring redundant tombstone for %s.".format(key))
- } else {
- // special handling for version 0:
- // set the expiration time stamp as commit time stamp + server default retention time
- val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
- putOffset(key, value.copy (
- expireTimestamp = {
- if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
- value.commitTimestamp + config.offsetsRetentionMs
- else
- value.expireTimestamp
- }
- ))
- trace("Loaded offset %s for %s.".format(value, key))
- }
+ val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
+ val removedOffsets = mutable.Set[GroupTopicPartition]()
+ val loadedGroups = mutable.Map[String, GroupMetadata]()
+ val removedGroups = mutable.Set[String]()
+
+ while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
+ buffer.clear()
+ val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
+ messages.readInto(buffer, 0)
+ val messageSet = new ByteBufferMessageSet(buffer)
+ messageSet.foreach { msgAndOffset =>
+ require(msgAndOffset.message.key != null, "Offset entry key should not be null")
+ val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
+
+ if (baseKey.isInstanceOf[OffsetKey]) {
+ // load offset
+ val key = baseKey.key.asInstanceOf[GroupTopicPartition]
+ if (msgAndOffset.message.payload == null) {
+ loadedOffsets.remove(key)
+ removedOffsets.add(key)
} else {
- // load group metadata
- val groupId = baseKey.key.asInstanceOf[String]
- val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
- if (groupMetadata != null) {
- trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
- removedGroups.remove(groupId)
- loadedGroups.put(groupId, groupMetadata)
- } else {
- loadedGroups.remove(groupId)
- removedGroups.add(groupId)
- }
+ val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
+ loadedOffsets.put(key, value)
+ removedOffsets.remove(key)
+ }
+ } else {
+ // load group metadata
+ val groupId = baseKey.key.asInstanceOf[String]
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
+ if (groupMetadata != null) {
+ trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
+ removedGroups.remove(groupId)
+ loadedGroups.put(groupId, groupMetadata)
+ } else {
+ loadedGroups.remove(groupId)
+ removedGroups.add(groupId)
}
-
- currOffset = msgAndOffset.nextOffset
}
- }
- loadedGroups.values.foreach { group =>
- val currentGroup = addGroup(group)
- if (group != currentGroup)
- debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
- s"because there is already a cached group with generation ${currentGroup.generationId}")
- else
- onGroupLoaded(group)
+ currOffset = msgAndOffset.nextOffset
}
+ }
- removedGroups.foreach { groupId =>
- val group = groupsCache.get(groupId)
- if (group != null)
- throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
- s"loading partition ${topicPartition}")
- }
+ val (groupOffsets, noGroupOffsets) = loadedOffsets
+ .groupBy(_._1.group)
+ .mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)})
+ .partition(value => loadedGroups.contains(value._1))
+
+ loadedGroups.values.foreach { group =>
+ val offsets = groupOffsets.getOrElse(group.groupId, Map.empty)
+ loadGroup(group, offsets)
+ onGroupLoaded(group)
+ }
+
+ noGroupOffsets.foreach { case (groupId, offsets) =>
+ val group = new GroupMetadata(groupId)
+ loadGroup(group, offsets)
+ onGroupLoaded(group)
+ }
+
+ removedGroups.foreach { groupId =>
+ if (groupMetadataCache.contains(groupId))
+ throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " +
+ s"loading partition ${topicPartition}")
}
if (!shuttingDown.get())
@@ -453,7 +478,7 @@ class GroupMetadataManager(val brokerId: Int,
error("Error in loading offsets from " + topicPartition, t)
}
finally {
- loadingPartitions synchronized {
+ inLock(partitionLock) {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}
@@ -461,10 +486,37 @@ class GroupMetadataManager(val brokerId: Int,
}
}
+ private def loadGroup(group: GroupMetadata, offsets: Iterable[(TopicPartition, OffsetAndMetadata)]): Unit = {
+ val currentGroup = addGroup(group)
+ if (group != currentGroup) {
+ debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+ s"because there is already a cached group with generation ${currentGroup.generationId}")
+ } else {
+
+ offsets.foreach {
+ case (topicPartition, offsetAndMetadata) => {
+ val offset = offsetAndMetadata.copy (
+ expireTimestamp = {
+ // special handling for version 0:
+ // set the expiration time stamp as commit time stamp + server default retention time
+ if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+ offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs
+ else
+ offsetAndMetadata.expireTimestamp
+ }
+ )
+ trace("Loaded offset %s for %s.".format(offset, topicPartition))
+ group.completePendingOffsetWrite(topicPartition, offset)
+ }
+ }
+ }
+ }
+
/**
* When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
* that partition.
- * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+ *
+ * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*/
def removeGroupsForPartition(offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit) {
@@ -475,31 +527,22 @@ class GroupMetadataManager(val brokerId: Int,
var numOffsetsRemoved = 0
var numGroupsRemoved = 0
- loadingPartitions synchronized {
+ inLock(partitionLock) {
// we need to guard the group removal in cache in the loading partition lock
// to prevent coordinator's check-and-get-group race condition
ownedPartitions.remove(offsetsPartition)
- // clear the offsets for this partition in the cache
-
/**
* NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
* in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
* transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
*/
- offsetsCache.keys.foreach { key =>
- if (partitionFor(key.group) == offsetsPartition) {
- offsetsCache.remove(key)
- numOffsetsRemoved += 1
- }
- }
-
- // clear the groups for this partition in the cache
- for (group <- groupsCache.values) {
+ for (group <- groupMetadataCache.values) {
if (partitionFor(group.groupId) == offsetsPartition) {
onGroupUnloaded(group)
- groupsCache.remove(group.groupId, group)
+ groupMetadataCache.remove(group.groupId, group)
numGroupsRemoved += 1
+ numOffsetsRemoved += group.numOffsets
}
}
}
@@ -512,82 +555,53 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- /**
- * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
- *
- * @param key The requested group-topic-partition
- * @return If the key is present, return the offset and metadata; otherwise return None
- */
- private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = {
- val offsetAndMetadata = offsetsCache.get(key)
- if (offsetAndMetadata == null)
- new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)
- else
- new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
- }
-
- /**
- * Put the (already committed) offset for the given group/topic/partition into the cache.
- *
- * @param key The group-topic-partition
- * @param offsetAndMetadata The offset/metadata to be stored
- */
- private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
- offsetsCache.put(key, offsetAndMetadata)
- }
-
- private def deleteExpiredOffsets() {
- debug("Collecting expired offsets.")
+ // visible for testing
+ private[coordinator] def cleanupGroupMetadata() {
val startMs = time.milliseconds()
-
- val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
- val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
- offsetAndMetadata.expireTimestamp < startMs
- }
-
- debug("Found %d expired offsets.".format(expiredOffsets.size))
-
- // delete the expired offsets from the table and generate tombstone messages to remove them from the log
- val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) =>
- val offsetsPartition = partitionFor(groupTopicAndPartition.group)
- trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
-
- offsetsCache.remove(groupTopicAndPartition)
-
- val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
- groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
-
- val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
- (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue))
- }.groupBy { case (partition, tombstone) => partition }
-
- // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
- // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
- tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
- val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
- partitionOpt.map { partition =>
- val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
- val messages = tombstones.map(_._2).toSeq
-
- trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again in the next purge cycle
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*))
- tombstones.size
- }
- catch {
- case t: Throwable =>
- error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t)
+ var offsetsRemoved = 0
+
+ groupMetadataCache.foreach { case (groupId, group) =>
+ group synchronized {
+ if (!group.is(Dead)) {
+ val offsetsPartition = partitionFor(groupId)
+
+ // delete the expired offsets from the table and generate tombstone messages to remove them from the log
+ val tombstones = group.removeExpiredOffsets(startMs).map { case (topicPartition, offsetAndMetadata) =>
+ trace("Removing expired offset and metadata for %s, %s: %s".format(groupId, topicPartition, offsetAndMetadata))
+ val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition)
+ val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition)
+ new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)
+ }.toBuffer
+
+ val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ partitionOpt.foreach { partition =>
+ val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
+ trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition))
+
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again in the next purge cycle
+ partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*))
+ offsetsRemoved += tombstones.size
+ }
+ catch {
+ case t: Throwable =>
+ error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t)
// ignore and continue
- 0
+ }
+ }
+
+ if (group.is(Empty) && !group.hasOffsets) {
+ group.transitionTo(Dead)
+ evictGroupAndDeleteMetadata(group)
+ info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
}
}
- }.sum
+ }
}
- info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, time.milliseconds() - startMs))
+ info("Removed %d expired offsets in %d milliseconds.".format(offsetsRemoved, time.milliseconds() - startMs))
+
}
private def getHighWatermark(partitionId: Int): Long = {
@@ -607,9 +621,11 @@ class GroupMetadataManager(val brokerId: Int,
metadata == null || metadata.length() <= config.maxMetadataSize
}
+
def shutdown() {
shuttingDown.set(true)
- scheduler.shutdown()
+ if (scheduler.isStarted)
+ scheduler.shutdown()
// TODO: clear the caches
}
@@ -642,7 +658,7 @@ class GroupMetadataManager(val brokerId: Int,
* NOTE: this is for test only
*/
def addPartitionOwnership(partition: Int) {
- loadingPartitions synchronized {
+ inLock(partitionLock) {
ownedPartitions.add(partition)
}
}
@@ -710,8 +726,8 @@ object GroupMetadataManager {
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING),
new Field("generation", INT32),
- new Field("protocol", STRING),
- new Field("leader", STRING),
+ new Field("protocol", NULLABLE_STRING),
+ new Field("leader", NULLABLE_STRING),
new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
@@ -787,7 +803,7 @@ object GroupMetadataManager {
*
* @return key bytes for group metadata message
*/
- private def groupMetadataKey(group: String): Array[Byte] = {
+ def groupMetadataKey(group: String): Array[Byte] = {
val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
key.set(GROUP_KEY_GROUP_FIELD, group)
@@ -823,10 +839,10 @@ object GroupMetadataManager {
* @param groupMetadata
* @return payload for offset commit message
*/
- private def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
+ def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
// generate commit value with schema version 1
val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA)
- value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType)
+ value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType.getOrElse(""))
value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId)
value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
@@ -937,13 +953,16 @@ object GroupMetadataManager {
if (version == 0) {
val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]
- val group = new GroupMetadata(groupId, protocolType)
+ val memberMetadataArray = value.getArray(GROUP_METADATA_MEMBERS_V0)
+ val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+
+ val group = new GroupMetadata(groupId, initialState)
group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int]
group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String]
group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String]
- value.getArray(GROUP_METADATA_MEMBERS_V0).foreach {
+ memberMetadataArray.foreach {
case memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
@@ -953,7 +972,7 @@ object GroupMetadataManager {
val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout,
- List((group.protocol, subscription)))
+ protocolType, List((group.protocol, subscription)))
member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
@@ -1012,6 +1031,9 @@ object GroupMetadataManager {
}
+case class DelayedStore(messageSet: Map[TopicPartition, MessageSet],
+ callback: Map[TopicPartition, PartitionResponse] => Unit)
+
case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
def this(group: String, topic: String, partition: Int) =
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index c57b990..19c9e8e 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -56,6 +56,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
val clientId: String,
val clientHost: String,
val sessionTimeoutMs: Int,
+ val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {
var assignment: Array[Byte] = Array.empty[Byte]
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index dc343fa..fa13a92 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata
import kafka.message.{Message, MessageSet}
import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
import kafka.utils._
-import org.apache.kafka.common.{utils, TopicPartition}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -96,7 +96,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
- groupCoordinator.startup()
+ groupCoordinator.startup(false)
// add the partition into the owned partition list
groupPartitionId = groupCoordinator.partitionFor(groupId)
@@ -106,7 +106,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
@After
def tearDown() {
EasyMock.reset(replicaManager)
- groupCoordinator.shutdown()
+ if (groupCoordinator != null)
+ groupCoordinator.shutdown()
}
@Test