You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/18 03:34:56 UTC
kafka git commit: KAFKA-2841;
safe group metadata cache loading/unloading
Repository: kafka
Updated Branches:
refs/heads/trunk 06d2c7816 -> dbdec927b
KAFKA-2841; safe group metadata cache loading/unloading
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #530 from hachikuji/KAFKA-2841
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dbdec927
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dbdec927
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dbdec927
Branch: refs/heads/trunk
Commit: dbdec927b9badcdbe4a4b4b6ccebf5044ca32747
Parents: 06d2c78
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Nov 17 18:34:51 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Nov 17 18:34:51 2015 -0800
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 72 ++++--
.../coordinator/GroupMetadataManager.scala | 221 ++++++++++---------
.../kafka/server/DelayedOperationKey.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 27 +--
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../scala/kafka/server/ReplicaManager.scala | 18 +-
core/src/main/scala/kafka/utils/Pool.scala | 4 +-
.../GroupCoordinatorResponseTest.scala | 2 +-
8 files changed, 200 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 23309c1..2f1b842 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -63,9 +63,8 @@ class GroupCoordinator(val brokerId: Int,
groupConfig: GroupConfig,
offsetConfig: OffsetConfig,
replicaManager: ReplicaManager,
- zkUtils: ZkUtils,
- scheduler: Scheduler) = this(brokerId, groupConfig, offsetConfig,
- new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils, scheduler))
+ zkUtils: ZkUtils) = this(brokerId, groupConfig, offsetConfig,
+ new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils))
def offsetsTopicConfigs: Properties = {
val props = new Properties
@@ -132,7 +131,7 @@ class GroupCoordinator(val brokerId: Int,
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
} else {
- group = groupManager.addGroup(groupId, protocolType)
+ group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
} else {
@@ -226,7 +225,7 @@ class GroupCoordinator(val brokerId: Int,
}
if (group.is(PreparingRebalance))
- joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+ joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
@@ -473,12 +472,49 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleGroupImmigration(offsetTopicPartitionId: Int) = {
- groupManager.loadGroupsForPartition(offsetTopicPartitionId)
+ private def onGroupUnloaded(group: GroupMetadata) {
+ group synchronized {
+ info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
+ val previousState = group.currentState
+ group.transitionTo(Dead)
+
+ previousState match {
+ case Dead =>
+ case PreparingRebalance =>
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingJoinCallback != null) {
+ member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
+ member.awaitingJoinCallback = null
+ }
+ }
+ joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+
+ case Stable | AwaitingSync =>
+ for (member <- group.allMemberMetadata) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ member.awaitingSyncCallback = null
+ }
+ heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
+ }
+ }
+ }
}
- def handleGroupEmigration(offsetTopicPartitionId: Int) = {
- groupManager.removeGroupsForPartition(offsetTopicPartitionId)
+ private def onGroupLoaded(group: GroupMetadata) {
+ group synchronized {
+ info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
+ assert(group.is(Stable))
+ group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
+ }
+ }
+
+ def handleGroupImmigration(offsetTopicPartitionId: Int) {
+ groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
+ }
+
+ def handleGroupEmigration(offsetTopicPartitionId: Int) {
+ groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
}
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
@@ -528,7 +564,7 @@ class GroupCoordinator(val brokerId: Int,
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
// complete current heartbeat expectation
member.latestHeartbeat = SystemTime.milliseconds
- val memberKey = ConsumerKey(member.groupId, member.memberId)
+ val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
// reschedule the next heartbeat expiration deadline
@@ -539,8 +575,8 @@ class GroupCoordinator(val brokerId: Int,
private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
member.isLeaving = true
- val consumerKey = ConsumerKey(member.groupId, member.memberId)
- heartbeatPurgatory.checkAndComplete(consumerKey)
+ val memberKey = MemberKey(member.groupId, member.memberId)
+ heartbeatPurgatory.checkAndComplete(memberKey)
}
private def addMemberAndRebalance(sessionTimeoutMs: Int,
@@ -584,8 +620,8 @@ class GroupCoordinator(val brokerId: Int,
val rebalanceTimeout = group.rebalanceTimeout
val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
- val consumerGroupKey = ConsumerGroupKey(group.groupId)
- joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
+ val groupKey = GroupKey(group.groupId)
+ joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
@@ -594,7 +630,7 @@ class GroupCoordinator(val brokerId: Int,
group.currentState match {
case Dead =>
case Stable | AwaitingSync => maybePrepareRebalance(group)
- case PreparingRebalance => joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+ case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
@@ -621,6 +657,7 @@ class GroupCoordinator(val brokerId: Int,
// TODO KAFKA-2720: only remove group in the background thread
if (group.isEmpty) {
+ group.transitionTo(Dead)
groupManager.removeGroup(group)
info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
}
@@ -694,8 +731,7 @@ object GroupCoordinator {
def create(config: KafkaConfig,
zkUtils: ZkUtils,
- replicaManager: ReplicaManager,
- scheduler: Scheduler): GroupCoordinator = {
+ replicaManager: ReplicaManager): GroupCoordinator = {
val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
@@ -707,7 +743,7 @@ object GroupCoordinator {
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
- new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, scheduler)
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils)
}
def create(config: KafkaConfig,
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 027abf7..a63f226 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -42,7 +42,7 @@ import scala.collection._
import java.io.PrintStream
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
@@ -53,8 +53,7 @@ case class DelayedStore(messageSet: Map[TopicAndPartition, MessageSet],
class GroupMetadataManager(val brokerId: Int,
val config: OffsetConfig,
replicaManager: ReplicaManager,
- zkUtils: ZkUtils,
- scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+ zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
/* offsets cache */
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -77,8 +76,12 @@ class GroupMetadataManager(val brokerId: Int,
/* number of partitions for the consumer metadata topic */
private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
+ /* Single-thread scheduler to handling offset/group metadata cache loading and unloading */
+ private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
+
this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
+ scheduler.startup()
scheduler.schedule(name = "delete-expired-consumer-offsets",
fun = deleteExpiredOffsets,
period = config.offsetsRetentionCheckIntervalMs,
@@ -116,55 +119,45 @@ class GroupMetadataManager(val brokerId: Int,
/**
* Add a group or get the group associated with the given groupId if it already exists
*/
- def addGroup(groupId: String, protocolType: String): GroupMetadata = {
- val newGroup = new GroupMetadata(groupId, protocolType)
- val currentGroup = groupsCache.putIfNotExists(groupId, newGroup)
- if (currentGroup != null)
+ def addGroup(group: GroupMetadata): GroupMetadata = {
+ val currentGroup = groupsCache.putIfNotExists(group.groupId, group)
+ if (currentGroup != null) {
currentGroup
- else
- newGroup
- }
-
- /**
- * Update the current cached metadata for the group with the given groupId or add the group if there is none.
- */
- private def updateGroup(groupId: String, group: GroupMetadata) {
- groupsCache.put(groupId, group)
+ } else {
+ group
+ }
}
/**
- * Remove all metadata associated with the group, note this function needs to be
- * called inside the group lock
+ * Remove all metadata associated with the group
* @param group
*/
def removeGroup(group: GroupMetadata) {
- // first mark the group as dead
- group.transitionTo(Dead)
-
- if (groupsCache.remove(group.groupId) != group)
- throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
-
- // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
- // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
- // retry removing this group.
- val groupPartition = partitionFor(group.groupId)
- val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
-
- val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
- partitionOpt.foreach { partition =>
- val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
-
- trace("Marking group %s as deleted.".format(group.groupId))
-
- try {
- // do not need to require acks since even if the tombstone is lost,
- // it will be appended again by the new leader
- // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
- partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
- } catch {
- case t: Throwable =>
- error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
+ // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members
+ // while the group is being removed due to coordinator emigration)
+ if (groupsCache.remove(group.groupId, group)) {
+ // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+ // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+ // retry removing this group.
+ val groupPartition = partitionFor(group.groupId)
+ val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
+
+ val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+ partitionOpt.foreach { partition =>
+ val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+
+ trace("Marking group %s as deleted.".format(group.groupId))
+
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again by the new leader
+ // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
+ partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
+ } catch {
+ case t: Throwable =>
+ error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
// ignore and continue
+ }
}
}
}
@@ -346,23 +339,22 @@ class GroupMetadataManager(val brokerId: Int,
/**
* Asynchronously read the partition from the offsets topic and populate the cache
*/
- def loadGroupsForPartition(offsetsPartition: Int) {
-
+ def loadGroupsForPartition(offsetsPartition: Int,
+ onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
- loadingPartitions synchronized {
- ownedPartitions.add(offsetsPartition)
+ def loadGroupsAndOffsets() {
+ info("Loading offsets and group metadata from " + topicPartition)
- if (loadingPartitions.contains(offsetsPartition)) {
- info("Offset load from %s already in progress.".format(topicPartition))
- } else {
- loadingPartitions.add(offsetsPartition)
- scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
+ loadingPartitions synchronized {
+ if (loadingPartitions.contains(offsetsPartition)) {
+ info("Offset load from %s already in progress.".format(topicPartition))
+ return
+ } else {
+ loadingPartitions.add(offsetsPartition)
+ }
}
- }
-
- def loadGroupsAndOffsets() {
- info("Loading offsets from " + topicPartition)
val startMs = SystemTime.milliseconds
try {
@@ -372,6 +364,9 @@ class GroupMetadataManager(val brokerId: Int,
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
inWriteLock(offsetExpireLock) {
+ val loadedGroups = mutable.Map[String, GroupMetadata]()
+ val removedGroups = mutable.Set[String]()
+
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()
val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
@@ -409,21 +404,33 @@ class GroupMetadataManager(val brokerId: Int,
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
- updateGroup(groupId, groupMetadata)
+ removedGroups.remove(groupId)
+ loadedGroups.put(groupId, groupMetadata)
} else {
- // this is a tombstone mark, we need to delete the group from cache if it exists
- val group = groupsCache.remove(groupId)
- if (group != null) {
- group synchronized {
- group.transitionTo(Dead)
- }
- }
+ loadedGroups.remove(groupId)
+ removedGroups.add(groupId)
}
}
currOffset = msgAndOffset.nextOffset
}
}
+
+ loadedGroups.values.foreach { group =>
+ val currentGroup = addGroup(group)
+ if (group != currentGroup)
+ debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+ s"because there is already a cached group with generation ${currentGroup.generationId}")
+ else
+ onGroupLoaded(group)
+ }
+
+ removedGroups.foreach { groupId =>
+ val group = groupsCache.get(groupId)
+ if (group != null)
+ throw new IllegalStateException(s"Unexpected unload of acitve group ${group.groupId} while " +
+ s"loading partition ${topicPartition}")
+ }
}
if (!shuttingDown.get())
@@ -438,7 +445,10 @@ class GroupMetadataManager(val brokerId: Int,
error("Error in loading offsets from " + topicPartition, t)
}
finally {
- loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
+ loadingPartitions synchronized {
+ ownedPartitions.add(offsetsPartition)
+ loadingPartitions.remove(offsetsPartition)
+ }
}
}
}
@@ -448,48 +458,48 @@ class GroupMetadataManager(val brokerId: Int,
* that partition.
* @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
*/
- def removeGroupsForPartition(offsetsPartition: Int) {
- var numOffsetsRemoved = 0
- var numGroupsRemoved = 0
-
- loadingPartitions synchronized {
- // we need to guard the group removal in cache in the loading partition lock
- // to prevent coordinator's check-and-get-group race condition
- ownedPartitions.remove(offsetsPartition)
-
- // clear the offsets for this partition in the cache
-
- /**
- * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
- * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
- * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
- */
- offsetsCache.keys.foreach { key =>
- if (partitionFor(key.group) == offsetsPartition) {
- offsetsCache.remove(key)
- numOffsetsRemoved += 1
+ def removeGroupsForPartition(offsetsPartition: Int,
+ onGroupUnloaded: GroupMetadata => Unit) {
+ val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+ scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+
+ def removeGroupsAndOffsets() {
+ var numOffsetsRemoved = 0
+ var numGroupsRemoved = 0
+
+ loadingPartitions synchronized {
+ // we need to guard the group removal in cache in the loading partition lock
+ // to prevent coordinator's check-and-get-group race condition
+ ownedPartitions.remove(offsetsPartition)
+
+ // clear the offsets for this partition in the cache
+
+ /**
+ * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
+ * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
+ * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
+ */
+ offsetsCache.keys.foreach { key =>
+ if (partitionFor(key.group) == offsetsPartition) {
+ offsetsCache.remove(key)
+ numOffsetsRemoved += 1
+ }
}
- }
-
- // clear the groups for this partition in the cache
- for (group <- groupsCache.values) {
- group synchronized {
- // mark the group as dead and then remove it from cache
- group.transitionTo(Dead)
-
- if (groupsCache.remove(group.groupId) != group)
- throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
+ // clear the groups for this partition in the cache
+ for (group <- groupsCache.values) {
+ onGroupUnloaded(group)
+ groupsCache.remove(group.groupId, group)
numGroupsRemoved += 1
}
}
- }
- if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+ if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
+ .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
- if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
- .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+ if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
+ .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+ }
}
/**
@@ -588,6 +598,7 @@ class GroupMetadataManager(val brokerId: Int,
def shutdown() {
shuttingDown.set(true)
+ scheduler.shutdown()
// TODO: clear the caches
}
@@ -850,7 +861,7 @@ object GroupMetadataManager {
// version 2 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
- GroupKey(version, group)
+ GroupMetadataKey(version, group)
} else {
throw new IllegalStateException("Unknown version " + version + " for group metadata message")
}
@@ -960,8 +971,8 @@ object GroupMetadataManager {
val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))
// only print if the message is a group metadata record
- if (formattedKey.isInstanceOf[GroupKey]) {
- val groupId = formattedKey.asInstanceOf[GroupKey].key
+ if (formattedKey.isInstanceOf[GroupMetadataKey]) {
+ val groupId = formattedKey.asInstanceOf[GroupMetadataKey].key
val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
output.write(groupId.getBytes)
output.write("::".getBytes)
@@ -991,7 +1002,7 @@ case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
override def toString = key.toString
}
-case class GroupKey(version: Short, key: String) extends BaseKey {
+case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
override def toString = key
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index c122bde..f005019 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -39,13 +39,13 @@ case class TopicPartitionOperationKey(topic: String, partition: Int) extends Del
}
/* used by delayed-join-group operations */
-case class ConsumerKey(groupId: String, consumerId: String) extends DelayedOperationKey {
+case class MemberKey(groupId: String, consumerId: String) extends DelayedOperationKey {
override def keyLabel = "%s-%s".format(groupId, consumerId)
}
/* used by delayed-rebalance operations */
-case class ConsumerGroupKey(groupId: String) extends DelayedOperationKey {
+case class GroupKey(groupId: String) extends DelayedOperationKey {
override def keyLabel = groupId
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d1c6f79..bb50e40 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,6 +22,7 @@ import java.util
import kafka.admin.AdminUtils
import kafka.api._
+import kafka.cluster.Partition
import kafka.common._
import kafka.controller.KafkaController
import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
@@ -112,23 +113,23 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizeClusterAction(request)
try {
- // call replica manager to handle updating partitions to become leader or follower
- val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache)
- val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
- // for each new leader or follower, call coordinator to handle
- // consumer group migration
- result.updatedLeaders.foreach { case partition =>
- if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
- coordinator.handleGroupImmigration(partition.partitionId)
- }
- result.updatedFollowers.foreach { case partition =>
- partition.leaderReplicaIdOpt.foreach { leaderReplica =>
- if (partition.topic == GroupCoordinator.GroupMetadataTopicName &&
- leaderReplica == brokerId)
+ def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
+ // for each new leader or follower, call coordinator to handle consumer group migration.
+ // this callback is invoked under the replica state change lock to ensure proper order of
+ // leadership changes
+ updatedLeaders.foreach { partition =>
+ if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
+ coordinator.handleGroupImmigration(partition.partitionId)
+ }
+ updatedFollowers.foreach { partition =>
+ if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
+ // call replica manager to handle updating partitions to become leader or follower
+ val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache, onLeadershipChange)
+ val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 80cc6f1..e8ea204 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -187,7 +187,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
kafkaController.startup()
/* start kafka coordinator */
- consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
+ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0dde914..a4553b3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -79,14 +79,10 @@ object LogReadResult {
false)
}
-case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short],
- updatedLeaders: Set[Partition],
- updatedFollowers: Set[Partition],
- errorCode: Short) {
+case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short], errorCode: Short) {
override def toString = {
- "updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]"
- .format(updatedLeaders, updatedFollowers, responseMap, errorCode)
+ "update results: [%s], global error: [%d]".format(responseMap, errorCode)
}
}
@@ -583,7 +579,9 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache): BecomeLeaderOrFollowerResult = {
+ def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
+ metadataCache: MetadataCache,
+ onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
@@ -597,7 +595,7 @@ class ReplicaManager(val config: KafkaConfig,
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
- BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode)
+ BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
val controllerId = leaderAndISRRequest.controllerId
val correlationId = leaderAndISRRequest.correlationId
@@ -651,7 +649,9 @@ class ReplicaManager(val config: KafkaConfig,
hwThreadInitialized = true
}
replicaFetcherManager.shutdownIdleFetcherThreads()
- BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError)
+
+ onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
+ BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.NoError)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 9ddcde7..beeab0f 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -71,7 +71,9 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
def get(key: K): V = pool.get(key)
def remove(key: K): V = pool.remove(key)
-
+
+ def remove(key: K, value: V): Boolean = pool.remove(key, value)
+
def keys: mutable.Set[K] = {
import JavaConversions._
pool.keySet()
http://git-wip-us.apache.org/repos/asf/kafka/blob/dbdec927/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 5be410c..0f702a0 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -87,7 +87,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret)
EasyMock.replay(zkUtils)
- groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime()))
+ groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager)
groupCoordinator.startup()
// add the partition into the owned partition list