You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:47 UTC
[3/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
new file mode 100644
index 0000000..ef94289
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -0,0 +1,632 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
+import kafka.log.LogConfig
+import kafka.message.UncompressedCodec
+import kafka.server._
+import kafka.utils._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.JoinGroupRequest
+
+import scala.collection.{Map, Seq, immutable}
+
+case class GroupManagerConfig(groupMinSessionTimeoutMs: Int,
+ groupMaxSessionTimeoutMs: Int)
+
+case class JoinGroupResult(members: Map[String, Array[Byte]],
+ memberId: String,
+ generationId: Int,
+ subProtocol: String,
+ leaderId: String,
+ errorCode: Short)
+
+/**
+ * GroupCoordinator handles general group membership and offset management.
+ *
+ * Each Kafka server instantiates a coordinator which is responsible for a set of
+ * groups. Groups are assigned to coordinators based on their group names.
+ */
+class GroupCoordinator(val brokerId: Int,
+ val groupConfig: GroupManagerConfig,
+ val offsetConfig: OffsetManagerConfig,
+ private val offsetManager: OffsetManager) extends Logging {
+ type JoinCallback = JoinGroupResult => Unit
+ type SyncCallback = (Array[Byte], Short) => Unit
+
+ this.logIdent = "[GroupCoordinator " + brokerId + "]: "
+
+ private val isActive = new AtomicBoolean(false)
+
+ private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
+ private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
+ private var coordinatorMetadata: CoordinatorMetadata = null
+
+ def this(brokerId: Int,
+ groupConfig: GroupManagerConfig,
+ offsetConfig: OffsetManagerConfig,
+ replicaManager: ReplicaManager,
+ zkUtils: ZkUtils,
+ scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
+ new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler))
+
+ def offsetsTopicConfigs: Properties = {
+ val props = new Properties
+ props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
+ props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
+ props
+ }
+
+ /**
+ * NOTE: If a group lock and metadataLock are simultaneously needed,
+ * be sure to acquire the group lock before metadataLock to prevent deadlock
+ */
+
+ /**
+ * Startup logic executed at the same time when the server starts up.
+ */
+ def startup() {
+ info("Starting up.")
+ heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
+ joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
+ coordinatorMetadata = new CoordinatorMetadata(brokerId)
+ isActive.set(true)
+ info("Startup complete.")
+ }
+
+ /**
+ * Shutdown logic executed at the same time when server shuts down.
+ * Ordering of actions should be reversed from the startup process.
+ */
+ def shutdown() {
+ info("Shutting down.")
+ isActive.set(false)
+ offsetManager.shutdown()
+ coordinatorMetadata.shutdown()
+ heartbeatPurgatory.shutdown()
+ joinPurgatory.shutdown()
+ info("Shutdown complete.")
+ }
+
+ def handleJoinGroup(groupId: String,
+ memberId: String,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback) {
+ if (!isActive.get) {
+ responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code))
+ } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
+ sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
+ responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
+ } else {
+ // only try to create the group if the group is not unknown AND
+ // the member id is UNKNOWN, if member is specified but group does not
+ // exist we should reject the request
+ var group = coordinatorMetadata.getGroup(groupId)
+ if (group == null) {
+ if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+ } else {
+ group = coordinatorMetadata.addGroup(groupId, protocolType)
+ doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ }
+ } else {
+ doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+ }
+ }
+ }
+
+ private def doJoinGroup(group: GroupMetadata,
+ memberId: String,
+ sessionTimeoutMs: Int,
+ protocolType: String,
+ protocols: List[(String, Array[Byte])],
+ responseCallback: JoinCallback) {
+ group synchronized {
+ if (group.protocolType != protocolType || !group.supportsProtocols(protocols.map(_._1).toSet)) {
+ // if the new member does not support the group protocol, reject it
+ responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
+ } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
+ // if the member trying to register with a un-recognized id, send the response to let
+ // it reset its member id and retry
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+ } else {
+ group.currentState match {
+ case Dead =>
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the member retry
+ // joining without the specified member id,
+ responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+
+ case PreparingRebalance =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ }
+
+ case AwaitingSync =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ if (member.matches(protocols)) {
+ // member is joining with the same metadata (which could be because it failed to
+ // receive the initial JoinGroup response), so just return current group information
+ // for the current generation.
+ responseCallback(JoinGroupResult(
+ members = if (memberId == group.leaderId) {
+ group.currentMemberMetadata
+ } else {
+ Map.empty
+ },
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocol,
+ leaderId = group.leaderId,
+ errorCode = Errors.NONE.code))
+ } else {
+ // member has changed metadata, so force a rebalance
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ }
+ }
+
+ case Stable =>
+ if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
+ // if the member id is unknown, register the member to the group
+ addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+ } else {
+ val member = group.get(memberId)
+ if (memberId == group.leaderId || !member.matches(protocols)) {
+ // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
+ // The latter allows the leader to trigger rebalances for changes affecting assignment
+ // which do not affect the member metadata (such as topic metadata changes for the consumer)
+ updateMemberAndRebalance(group, member, protocols, responseCallback)
+ } else {
+ // for followers with no actual change to their metadata, just return group information
+ // for the current generation which will allow them to issue SyncGroup
+ responseCallback(JoinGroupResult(
+ members = Map.empty,
+ memberId = memberId,
+ generationId = group.generationId,
+ subProtocol = group.protocol,
+ leaderId = group.leaderId,
+ errorCode = Errors.NONE.code))
+ }
+ }
+ }
+
+ if (group.is(PreparingRebalance))
+ joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+ }
+ }
+ }
+
+ def handleSyncGroup(groupId: String,
+ generation: Int,
+ memberId: String,
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: SyncCallback) {
+ if (!isActive.get) {
+ responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ } else {
+ val group = coordinatorMetadata.getGroup(groupId)
+ if (group == null)
+ responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+ else
+ doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
+ }
+ }
+
+ private def doSyncGroup(group: GroupMetadata,
+ generationId: Int,
+ memberId: String,
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: SyncCallback) {
+ group synchronized {
+ if (!group.has(memberId)) {
+ responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (generationId != group.generationId) {
+ responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
+ } else {
+ group.currentState match {
+ case Dead =>
+ responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+
+ case PreparingRebalance =>
+ responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
+
+ case AwaitingSync =>
+ group.get(memberId).awaitingSyncCallback = responseCallback
+ completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+
+ // if this is the leader, then we can transition to stable and
+ // propagate the assignment to any awaiting members
+ if (memberId == group.leaderId) {
+ group.transitionTo(Stable)
+ propagateAssignment(group, groupAssignment)
+ }
+
+ case Stable =>
+ // if the group is stable, we just return the current assignment
+ val memberMetadata = group.get(memberId)
+ responseCallback(memberMetadata.assignment, Errors.NONE.code)
+ completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
+ }
+ }
+ }
+ }
+
+ def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
+ if (!isActive.get) {
+ responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ } else {
+ val group = coordinatorMetadata.getGroup(groupId)
+ if (group == null) {
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
+ // joining without specified consumer id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (!group.has(consumerId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else {
+ val member = group.get(consumerId)
+ removeHeartbeatForLeavingMember(group, member)
+ onMemberFailure(group, member)
+ responseCallback(Errors.NONE.code)
+ }
+ }
+ }
+ }
+ }
+
+ def handleHeartbeat(groupId: String,
+ memberId: String,
+ generationId: Int,
+ responseCallback: Short => Unit) {
+ if (!isActive.get) {
+ responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+ } else {
+ val group = coordinatorMetadata.getGroup(groupId)
+ if (group == null) {
+ // if the group is marked as dead, it means some other thread has just removed the group
+ // from the coordinator metadata; this is likely that the group has migrated to some other
+ // coordinator OR the group is in a transient unstable phase. Let the member retry
+ // joining without the specified member id,
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (!group.is(Stable)) {
+ responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+ } else if (!group.has(memberId)) {
+ responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+ } else if (generationId != group.generationId) {
+ responseCallback(Errors.ILLEGAL_GENERATION.code)
+ } else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ responseCallback(Errors.NONE.code)
+ }
+ }
+ }
+ }
+ }
+
+ def handleCommitOffsets(groupId: String,
+ memberId: String,
+ generationId: Int,
+ offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+ if (!isActive.get) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+ } else if (!isCoordinatorForGroup(groupId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
+ } else {
+ val group = coordinatorMetadata.getGroup(groupId)
+ if (group == null) {
+ if (generationId < 0)
+ // the group is not relying on Kafka for partition management, so allow the commit
+ offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+ else
+ // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
+ // or this is a request coming from an older generation. either way, reject the commit
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+ } else if (group.is(AwaitingSync)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
+ } else if (!group.has(memberId)) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+ } else if (generationId != group.generationId) {
+ responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+ } else {
+ offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+ }
+ }
+ }
+ }
+ }
+
+ def handleFetchOffsets(groupId: String,
+ partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+ if (!isActive.get) {
+ partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap
+ } else if (!isCoordinatorForGroup(groupId)) {
+ partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+ } else {
+ // return offsets blindly regardless the current group state since the group may be using
+ // Kafka commit storage without automatic group management
+ offsetManager.getOffsets(groupId, partitions)
+ }
+ }
+
+ def handleGroupImmigration(offsetTopicPartitionId: Int) = {
+ // TODO we may need to add more logic in KAFKA-2017
+ offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
+ }
+
+ def handleGroupEmigration(offsetTopicPartitionId: Int) = {
+ // TODO we may need to add more logic in KAFKA-2017
+ offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
+ }
+
+ private def joinError(memberId: String, errorCode: Short): JoinGroupResult = {
+ JoinGroupResult(
+ members=Map.empty,
+ memberId=memberId,
+ generationId=0,
+ subProtocol=GroupCoordinator.NoProtocol,
+ leaderId=GroupCoordinator.NoLeader,
+ errorCode=errorCode)
+ }
+
+ private def propagateAssignment(group: GroupMetadata,
+ assignment: Map[String, Array[Byte]]) {
+ for (member <- group.allMembers) {
+ member.assignment = assignment.getOrElse(member.memberId, Array.empty[Byte])
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(member.assignment, Errors.NONE.code)
+ member.awaitingSyncCallback = null
+ }
+ }
+ }
+
+ /**
+ * Complete existing DelayedHeartbeats for the given member and schedule the next one
+ */
+ private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
+ // complete current heartbeat expectation
+ member.latestHeartbeat = SystemTime.milliseconds
+ val memberKey = ConsumerKey(member.groupId, member.memberId)
+ heartbeatPurgatory.checkAndComplete(memberKey)
+
+ // reschedule the next heartbeat expiration deadline
+ val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
+ val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
+ heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
+ }
+
+ private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
+ member.isLeaving = true
+ val consumerKey = ConsumerKey(member.groupId, member.memberId)
+ heartbeatPurgatory.checkAndComplete(consumerKey)
+ }
+
+ private def addMemberAndRebalance(sessionTimeoutMs: Int,
+ protocols: List[(String, Array[Byte])],
+ group: GroupMetadata,
+ callback: JoinCallback) = {
+ val memberId = group.generateNextMemberId
+ val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
+ member.awaitingJoinCallback = callback
+ group.add(member.memberId, member)
+ maybePrepareRebalance(group)
+ member
+ }
+
+ private def updateMemberAndRebalance(group: GroupMetadata,
+ member: MemberMetadata,
+ protocols: List[(String, Array[Byte])],
+ callback: JoinCallback) {
+ member.supportedProtocols = protocols
+ member.awaitingJoinCallback = callback
+ maybePrepareRebalance(group)
+ }
+
+ private def maybePrepareRebalance(group: GroupMetadata) {
+ group synchronized {
+ if (group.canRebalance)
+ prepareRebalance(group)
+ }
+ }
+
+ private def prepareRebalance(group: GroupMetadata) {
+ // if any members are awaiting sync, cancel their request and have them rejoin
+ if (group.is(AwaitingSync)) {
+ for (member <- group.allMembers) {
+ if (member.awaitingSyncCallback != null) {
+ member.awaitingSyncCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
+ member.awaitingSyncCallback = null
+ }
+ }
+ }
+
+ group.allMembers.foreach(_.assignment = null)
+ group.transitionTo(PreparingRebalance)
+ info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
+
+ val rebalanceTimeout = group.rebalanceTimeout
+ val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
+ val consumerGroupKey = ConsumerGroupKey(group.groupId)
+ joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(consumerGroupKey))
+ }
+
+ private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
+ trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
+ group.remove(member.memberId)
+ group.currentState match {
+ case Dead =>
+ case Stable | AwaitingSync => maybePrepareRebalance(group)
+ case PreparingRebalance => joinPurgatory.checkAndComplete(ConsumerGroupKey(group.groupId))
+ }
+ }
+
+ def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
+ group synchronized {
+ if (group.notYetRejoinedMembers.isEmpty)
+ forceComplete()
+ else false
+ }
+ }
+
+ def onExpireJoin() {
+ // TODO: add metrics for restabilize timeouts
+ }
+
+ def onCompleteJoin(group: GroupMetadata) {
+ group synchronized {
+ val failedMembers = group.notYetRejoinedMembers
+ if (group.isEmpty || !failedMembers.isEmpty) {
+ failedMembers.foreach { failedMember =>
+ group.remove(failedMember.memberId)
+ // TODO: cut the socket connection to the client
+ }
+
+ if (group.isEmpty) {
+ group.transitionTo(Dead)
+ info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
+ coordinatorMetadata.removeGroup(group.groupId)
+ }
+ }
+ if (!group.is(Dead)) {
+ group.initNextGeneration
+ info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
+
+ // trigger the awaiting join group response callback for all the members after rebalancing
+ for (member <- group.allMembers) {
+ assert(member.awaitingJoinCallback != null)
+ val joinResult = JoinGroupResult(
+ members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+ memberId=member.memberId,
+ generationId=group.generationId,
+ subProtocol=group.protocol,
+ leaderId=group.leaderId,
+ errorCode=Errors.NONE.code)
+
+ member.awaitingJoinCallback(joinResult)
+ member.awaitingJoinCallback = null
+ completeAndScheduleNextHeartbeatExpiration(group, member)
+ }
+ }
+ }
+ }
+
+ def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
+ group synchronized {
+ if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
+ forceComplete()
+ else false
+ }
+ }
+
+ def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
+ group synchronized {
+ if (!shouldKeepMemberAlive(member, heartbeatDeadline))
+ onMemberFailure(group, member)
+ }
+ }
+
+ def onCompleteHeartbeat() {
+ // TODO: add metrics for complete heartbeats
+ }
+
+ def partitionFor(group: String): Int = offsetManager.partitionFor(group)
+
+ private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
+ member.awaitingJoinCallback != null ||
+ member.awaitingSyncCallback != null ||
+ member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
+
+ private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
+}
+
+object GroupCoordinator {
+
+ val NoProtocol = ""
+ val NoLeader = ""
+ val OffsetsTopicName = "__consumer_offsets"
+
+ def create(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ replicaManager: ReplicaManager,
+ kafkaScheduler: KafkaScheduler): GroupCoordinator = {
+ val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+ loadBufferSize = config.offsetsLoadBufferSize,
+ offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+ offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+ offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+ offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+ offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+ groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
+ }
+
+ def create(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ offsetManager: OffsetManager): GroupCoordinator = {
+ val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+ loadBufferSize = config.offsetsLoadBufferSize,
+ offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+ offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+ offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+ offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+ offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+ val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+ groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
+
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
new file mode 100644
index 0000000..60ee987
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+import java.util.UUID
+
+import org.apache.kafka.common.protocol.Errors
+
+import collection.mutable
+
+private[coordinator] sealed trait GroupState { def state: Byte }
+
+/**
+ * Group is preparing to rebalance
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ * respond to sync group with REBALANCE_IN_PROGRESS
+ * remove member on leave group request
+ * park join group requests from new or existing members until all expected members have joined
+ * allow offset commits from previous generation
+ * allow offset fetch requests
+ * transition: some members have joined by the timeout => AwaitingSync
+ * all members have left the group => Dead
+ */
+private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
+
+/**
+ * Group is awaiting state assignment from the leader
+ *
+ * action: respond to heartbeats with REBALANCE_IN_PROGRESS
+ * respond to offset commits with REBALANCE_IN_PROGRESS
+ * park sync group requests from followers until transition to Stable
+ * allow offset fetch requests
+ * transition: sync group with state assignment received from leader => Stable
+ * join group from new member or existing member with updated metadata => PreparingRebalance
+ * leave group from existing member => PreparingRebalance
+ * member failure detected => PreparingRebalance
+ */
+private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
+
+/**
+ * Group is stable
+ *
+ * action: respond to member heartbeats normally
+ * respond to sync group from any member with current assignment
+ * respond to join group from followers with matching metadata with current group metadata
+ * allow offset commits from member of current generation
+ * allow offset fetch requests
+ * transition: member failure detected via heartbeat => PreparingRebalance
+ * leave group from existing member => PreparingRebalance
+ * leader join-group received => PreparingRebalance
+ * follower join-group with new metadata => PreparingRebalance
+ */
+private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
+
+/**
+ * Group has no more members
+ *
+ * action: respond to join group with UNKNOWN_MEMBER_ID
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
+ */
+private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
+
+
+private object GroupMetadata {
+ private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+ Map(Dead -> Set(PreparingRebalance),
+ AwaitingSync -> Set(PreparingRebalance),
+ Stable -> Set(AwaitingSync),
+ PreparingRebalance -> Set(Stable, AwaitingSync))
+}
+
+/**
+ * Group contains the following metadata:
+ *
+ * Membership metadata:
+ * 1. Members registered in this group
+ * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
+ * 3. Protocol metadata associated with group members
+ *
+ * State metadata:
+ * 1. group state
+ * 2. generation id
+ * 3. leader id
+ */
+@nonthreadsafe
+private[coordinator] class GroupMetadata(val groupId: String, val protocolType: String) {
+
+ private val members = new mutable.HashMap[String, MemberMetadata]
+ private var state: GroupState = Stable
+ var generationId = 0
+ var leaderId: String = null
+ var protocol: String = null
+
+ def is(groupState: GroupState) = state == groupState
+ def not(groupState: GroupState) = state != groupState
+ def has(memberId: String) = members.contains(memberId)
+ def get(memberId: String) = members(memberId)
+
+ def add(memberId: String, member: MemberMetadata) {
+ assert(supportsProtocols(member.protocols))
+
+ if (leaderId == null)
+ leaderId = memberId
+ members.put(memberId, member)
+ }
+
+ def remove(memberId: String) {
+ members.remove(memberId)
+ if (memberId == leaderId) {
+ leaderId = if (members.isEmpty) {
+ null
+ } else {
+ members.keys.head
+ }
+ }
+ }
+
+ def currentState = state
+
+ def isEmpty = members.isEmpty
+
+ def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
+
+ def allMembers = members.values.toList
+
+ def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
+ timeout.max(member.sessionTimeoutMs)
+ }
+
+ // TODO: decide if ids should be predictable or random
+ def generateNextMemberId = UUID.randomUUID().toString
+
+ def canRebalance = state == Stable || state == AwaitingSync
+
+ def transitionTo(groupState: GroupState) {
+ assertValidTransition(groupState)
+ state = groupState
+ }
+
+ def selectProtocol: String = {
+ if (members.isEmpty)
+ throw new IllegalStateException("Cannot select protocol for empty group")
+
+ // select the protocol for this group which is supported by all members
+ val candidates = candidateProtocols
+
+ // let each member vote for one of the protocols and choose the one with the most votes
+ val votes: List[(String, Int)] = allMembers
+ .map(_.vote(candidates))
+ .groupBy(identity)
+ .mapValues(_.size)
+ .toList
+
+ votes.maxBy(_._2)._1
+ }
+
+ private def candidateProtocols = {
+ // get the set of protocols that are commonly supported by all members
+ allMembers
+ .map(_.protocols)
+ .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+ }
+
+ def supportsProtocols(memberProtocols: Set[String]) = {
+ isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+ }
+
+ def initNextGeneration = {
+ assert(notYetRejoinedMembers == List.empty[MemberMetadata])
+ generationId += 1
+ protocol = selectProtocol
+ transitionTo(AwaitingSync)
+ }
+
+ def currentMemberMetadata: Map[String, Array[Byte]] = {
+ if (is(Dead) || is(PreparingRebalance))
+ throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
+ members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
+ }
+
+ private def assertValidTransition(targetState: GroupState) {
+ if (!GroupMetadata.validPreviousStates(targetState).contains(state))
+ throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
+ .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
new file mode 100644
index 0000000..7f7df9a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util
+
+import kafka.utils.nonthreadsafe
+
+import scala.collection.Map
+
+/**
+ * Member metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ * its rebalance callback will be kept in the metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ * is kept in metadata until the leader provides the group assignment
+ * and the group transitions to stable
+ */
+@nonthreadsafe
+private[coordinator] class MemberMetadata(val memberId: String,
+ val groupId: String,
+ val sessionTimeoutMs: Int,
+ var supportedProtocols: List[(String, Array[Byte])]) {
+
+ var assignment: Array[Byte] = null
+ var awaitingJoinCallback: JoinGroupResult => Unit = null
+ var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+ var latestHeartbeat: Long = -1
+ var isLeaving: Boolean = false
+
+ def protocols = supportedProtocols.map(_._1).toSet
+
+ /**
+ * Get metadata corresponding to the provided protocol.
+ */
+ def metadata(protocol: String): Array[Byte] = {
+ supportedProtocols.find(_._1 == protocol) match {
+ case Some((_, metadata)) => metadata
+ case None =>
+ throw new IllegalArgumentException("Member does not support protocol")
+ }
+ }
+
+ /**
+ * Check if the provided protocol metadata matches the currently stored metadata.
+ */
+ def matches(protocols: List[(String, Array[Byte])]): Boolean = {
+ if (protocols.size != this.supportedProtocols.size)
+ return false
+
+ for (i <- 0 until protocols.size) {
+ val p1 = protocols(i)
+ val p2 = supportedProtocols(i)
+ if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
+ return false
+ }
+ return true
+ }
+
+ /**
+ * Vote for one of the potential group protocols. This takes into account the protocol preference as
+ * indicated by the order of supported protocols and returns the first one also contained in the set
+ */
+ def vote(candidates: Set[String]): String = {
+ supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
+ case Some((protocol, _)) => protocol
+ case None =>
+ throw new IllegalArgumentException("Member does not support any of the candidate protocols")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala b/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
deleted file mode 100644
index 8499bf8..0000000
--- a/core/src/main/scala/kafka/coordinator/PartitionAssignor.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.TopicAndPartition
-import kafka.utils.CoreUtils
-
-private[coordinator] trait PartitionAssignor {
- /**
- * Assigns partitions to consumers in a group.
- * @return A mapping from consumer to assigned partitions.
- */
- def assign(topicsPerConsumer: Map[String, Set[String]],
- partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]]
-
- protected def fill[K, V](vsPerK: Map[K, Set[V]], expectedKs: Set[K]): Map[K, Set[V]] = {
- val unfilledKs = expectedKs -- vsPerK.keySet
- vsPerK ++ unfilledKs.map(k => (k, Set.empty[V]))
- }
-
- protected def aggregate[K, V](pairs: Seq[(K, V)]): Map[K, Set[V]] = {
- pairs
- .groupBy { case (k, v) => k }
- .map { case (k, kvPairs) => (k, kvPairs.map(_._2).toSet) }
- }
-
- protected def invert[K, V](vsPerK: Map[K, Set[V]]): Map[V, Set[K]] = {
- val vkPairs = vsPerK.toSeq.flatMap { case (k, vs) => vs.map(v => (v, k)) }
- aggregate(vkPairs)
- }
-}
-
-private[coordinator] object PartitionAssignor {
- val strategies = Set("range", "roundrobin")
-
- def createInstance(strategy: String) = strategy match {
- case "roundrobin" => new RoundRobinAssignor()
- case _ => new RangeAssignor()
- }
-}
-
-/**
- * The roundrobin assignor lays out all the available partitions and all the available consumers. It
- * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer
- * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
- * will be within a delta of exactly one across all consumers.)
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
- * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0 -> [t0p0, t0p2, t1p1]
- * C1 -> [t0p1, t1p0, t1p2]
- */
-private[coordinator] class RoundRobinAssignor extends PartitionAssignor {
- override def assign(topicsPerConsumer: Map[String, Set[String]],
- partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
- val consumers = topicsPerConsumer.keys.toSeq.sorted
- val topics = topicsPerConsumer.values.flatten.toSeq.distinct.sorted
-
- val allTopicPartitions = topics.flatMap { topic =>
- val numPartitionsForTopic = partitionsPerTopic(topic)
- (0 until numPartitionsForTopic).map(partition => TopicAndPartition(topic, partition))
- }
-
- var consumerAssignor = CoreUtils.circularIterator(consumers)
- val consumerPartitionPairs = allTopicPartitions.map { topicAndPartition =>
- consumerAssignor = consumerAssignor.dropWhile(consumerId => !topicsPerConsumer(consumerId).contains(topicAndPartition.topic))
- val consumer = consumerAssignor.next()
- (consumer, topicAndPartition)
- }
- fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet)
- }
-}
-
-/**
- * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
- * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
- * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
- * divide, then the first few consumers will have one extra partition.
- *
- * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
- * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
- *
- * The assignment will be:
- * C0 -> [t0p0, t0p1, t1p0, t1p1]
- * C1 -> [t0p2, t1p2]
- */
-private[coordinator] class RangeAssignor extends PartitionAssignor {
- override def assign(topicsPerConsumer: Map[String, Set[String]],
- partitionsPerTopic: Map[String, Int]): Map[String, Set[TopicAndPartition]] = {
- val consumersPerTopic = invert(topicsPerConsumer)
- val consumerPartitionPairs = consumersPerTopic.toSeq.flatMap { case (topic, consumersForTopic) =>
- val numPartitionsForTopic = partitionsPerTopic(topic)
-
- val numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size
- val consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size
-
- consumersForTopic.toSeq.sorted.zipWithIndex.flatMap { case (consumerForTopic, consumerIndex) =>
- val startPartition = numPartitionsPerConsumer * consumerIndex + consumerIndex.min(consumersWithExtraPartition)
- val numPartitions = numPartitionsPerConsumer + (if (consumerIndex + 1 > consumersWithExtraPartition) 0 else 1)
-
- // The first few consumers pick up an extra partition, if any.
- (startPartition until startPartition + numPartitions)
- .map(partition => (consumerForTopic, TopicAndPartition(topic, partition)))
- }
- }
- fill(aggregate(consumerPartitionPairs), topicsPerConsumer.keySet)
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
deleted file mode 100644
index 4345a8e..0000000
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.javaapi
-
-import java.nio.ByteBuffer
-import kafka.cluster.BrokerEndPoint
-
-class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
-
- def errorCode = underlying.errorCode
-
- def coordinator: BrokerEndPoint = {
- import kafka.javaapi.Implicits._
- underlying.coordinatorOpt
- }
-
- override def equals(other: Any) = canEqual(other) && {
- val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
- this.underlying.equals(otherConsumerMetadataResponse.underlying)
- }
-
- def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
-
- override def hashCode = underlying.hashCode
-
- override def toString = underlying.toString
-
-}
-
-object ConsumerMetadataResponse {
- def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer))
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
new file mode 100644
index 0000000..b94aa01
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.BrokerEndPoint
+
+class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) {
+
+ def errorCode = underlying.errorCode
+
+ def coordinator: BrokerEndPoint = {
+ import kafka.javaapi.Implicits._
+ underlying.coordinatorOpt
+ }
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse]
+ this.underlying.equals(otherConsumerMetadataResponse.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse]
+
+ override def hashCode = underlying.hashCode
+
+ override def toString = underlying.toString
+
+}
+
+object GroupMetadataResponse {
+ def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer))
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 3b8312d..ceb6348 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -33,8 +33,8 @@ case object Topic extends ResourceType {
val name = "Topic"
}
-case object ConsumerGroup extends ResourceType {
- val name = "ConsumerGroup"
+case object Group extends ResourceType {
+ val name = "Group"
}
@@ -45,5 +45,5 @@ object ResourceType {
rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
}
- def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup)
+ def values: Seq[ResourceType] = List(Cluster, Topic, Group)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6acab8d..c80bd46 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,24 +17,26 @@
package kafka.server
-import kafka.message.MessageSet
-import kafka.security.auth.Topic
-import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.TopicPartition
-import kafka.api._
+import java.nio.ByteBuffer
+
import kafka.admin.AdminUtils
+import kafka.api._
import kafka.common._
import kafka.controller.KafkaController
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.{GroupCoordinator, JoinGroupResult}
import kafka.log._
+import kafka.message.MessageSet
import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
-import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend}
-import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
-import scala.collection._
+import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
+import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
-import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Describe, Resource, Topic, Operation, ConsumerGroup}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection._
/**
@@ -42,7 +44,7 @@ import kafka.security.auth.{Authorizer, Read, Write, Create, ClusterAction, Desc
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
- val coordinator: ConsumerCoordinator,
+ val coordinator: GroupCoordinator,
val controller: KafkaController,
val zkUtils: ZkUtils,
val brokerId: Int,
@@ -73,10 +75,11 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
- case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
+ case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request)
case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
+ case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -114,12 +117,12 @@ class KafkaApis(val requestChannel: RequestChannel,
// for each new leader or follower, call coordinator to handle
// consumer group migration
result.updatedLeaders.foreach { case partition =>
- if (partition.topic == ConsumerCoordinator.OffsetsTopicName)
+ if (partition.topic == GroupCoordinator.OffsetsTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
result.updatedFollowers.foreach { case partition =>
partition.leaderReplicaIdOpt.foreach { leaderReplica =>
- if (partition.topic == ConsumerCoordinator.OffsetsTopicName &&
+ if (partition.topic == GroupCoordinator.OffsetsTopicName &&
leaderReplica == brokerId)
coordinator.handleGroupEmigration(partition.partitionId)
}
@@ -188,7 +191,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicAndPartition, offsetMetadata) =>
authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
- authorize(request.session, Read, new Resource(ConsumerGroup, offsetCommitRequest.groupId))
+ authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))
}
// the callback for sending an offset commit response
@@ -268,7 +271,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// call coordinator to handle commit offset
coordinator.handleCommitOffsets(
offsetCommitRequest.groupId,
- offsetCommitRequest.consumerId,
+ offsetCommitRequest.memberId,
offsetCommitRequest.groupGenerationId,
offsetData,
sendResponseCallback)
@@ -526,9 +529,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topics.size > 0 && topicResponses.size != topics.size) {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
- if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
+ if (topic == GroupCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
try {
- if (topic == ConsumerCoordinator.OffsetsTopicName) {
+ if (topic == GroupCoordinator.OffsetsTopicName) {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
@@ -610,7 +613,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
- authorize(request.session, Read, new Resource(ConsumerGroup, offsetFetchRequest.groupId))
+ authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))
}
val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
@@ -659,29 +662,29 @@ class KafkaApis(val requestChannel: RequestChannel,
/*
* Handle a consumer metadata request
*/
- def handleConsumerMetadataRequest(request: RequestChannel.Request) {
- val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
+ def handleGroupMetadataRequest(request: RequestChannel.Request) {
+ val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest]
- if (!authorize(request.session, Read, new Resource(ConsumerGroup, consumerMetadataRequest.group))) {
- val response = ConsumerMetadataResponse(None, ErrorMapping.AuthorizationCode, consumerMetadataRequest.correlationId)
+ if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) {
+ val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
} else {
- val partition = coordinator.partitionFor(consumerMetadataRequest.group)
+ val partition = coordinator.partitionFor(groupMetadataRequest.group)
- //get metadata (and create the topic if necessary)
- val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head
+ // get metadata (and create the topic if necessary)
+ val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.OffsetsTopicName), request.securityProtocol).head
- val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
+ val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId)
val response =
offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata =>
partitionMetadata.leader.map { leader =>
- ConsumerMetadataResponse(Some(leader), ErrorMapping.NoError, consumerMetadataRequest.correlationId)
+ GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId)
}.getOrElse(errorResponse)
}.getOrElse(errorResponse)
trace("Sending consumer metadata %s for correlation id %d to client %s."
- .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId))
+ .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId))
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
}
@@ -690,39 +693,65 @@ class KafkaApis(val requestChannel: RequestChannel,
import JavaConversions._
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
- val respHeader = new ResponseHeader(request.header.correlationId)
+ val responseHeader = new ResponseHeader(request.header.correlationId)
// the callback for sending a join-group response
- def sendResponseCallback(partitions: Set[TopicAndPartition], consumerId: String, generationId: Int, errorCode: Short) {
- val partitionList = if (errorCode == ErrorMapping.NoError)
- partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer
- else
- List.empty.toBuffer
-
- val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList)
-
+ def sendResponseCallback(joinResult: JoinGroupResult) {
+ val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
+ val responseBody = new JoinGroupResponse(joinResult.errorCode, joinResult.generationId, joinResult.subProtocol,
+ joinResult.memberId, joinResult.leaderId, members)
trace("Sending join group response %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
- requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
- // ensure that the client is authorized to join the group and read from all subscribed topics
- if (!authorize(request.session, Read, new Resource(ConsumerGroup, joinGroupRequest.groupId())) ||
- joinGroupRequest.topics().exists(topic => !authorize(request.session, Read, new Resource(Topic, topic)))) {
- val responseBody = new JoinGroupResponse(ErrorMapping.AuthorizationCode, 0, joinGroupRequest.consumerId(), List.empty[TopicPartition])
- requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, responseBody)))
+ if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
+ val responseBody = new JoinGroupResponse(
+ ErrorMapping.AuthorizationCode,
+ JoinGroupResponse.UNKNOWN_GENERATION_ID,
+ JoinGroupResponse.UNKNOWN_PROTOCOL,
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
+ JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId
+ Map.empty[String, ByteBuffer])
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
// let the coordinator to handle join-group
+ val protocols = joinGroupRequest.groupProtocols().map(protocol =>
+ (protocol.name, Utils.toArray(protocol.metadata))).toList
coordinator.handleJoinGroup(
joinGroupRequest.groupId(),
- joinGroupRequest.consumerId(),
- joinGroupRequest.topics().toSet,
+ joinGroupRequest.memberId(),
joinGroupRequest.sessionTimeout(),
- joinGroupRequest.strategy(),
+ joinGroupRequest.protocolType(),
+ protocols,
sendResponseCallback)
}
}
+ def handleSyncGroupRequest(request: RequestChannel.Request) {
+ import JavaConversions._
+
+ val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
+
+ def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
+ val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
+ val responseHeader = new ResponseHeader(request.header.correlationId)
+ requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ }
+
+ if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
+ sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode)
+ } else {
+ coordinator.handleSyncGroup(
+ syncGroupRequest.groupId(),
+ syncGroupRequest.generationId(),
+ syncGroupRequest.memberId(),
+ syncGroupRequest.groupAssignment().mapValues(Utils.toArray(_)),
+ sendResponseCallback
+ )
+ }
+ }
+
def handleHeartbeatRequest(request: RequestChannel.Request) {
val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
val respHeader = new ResponseHeader(request.header.correlationId)
@@ -735,7 +764,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
- if (!authorize(request.session, Read, new Resource(ConsumerGroup, heartbeatRequest.groupId))) {
+ if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
}
@@ -743,7 +772,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// let the coordinator to handle heartbeat
coordinator.handleHeartbeat(
heartbeatRequest.groupId(),
- heartbeatRequest.consumerId(),
+ heartbeatRequest.memberId(),
heartbeatRequest.groupGenerationId(),
sendResponseCallback)
}
@@ -788,11 +817,16 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response)))
}
- // let the coordinator to handle leave-group
- coordinator.handleLeaveGroup(
- leaveGroupRequest.groupId(),
- leaveGroupRequest.consumerId(),
- sendResponseCallback)
+ if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
+ val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode)
+ requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
+ } else {
+ // let the coordinator to handle leave-group
+ coordinator.handleLeaveGroup(
+ leaveGroupRequest.groupId(),
+ leaveGroupRequest.consumerId(),
+ sendResponseCallback)
+ }
}
def close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 194ee9c..b054f48 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -277,9 +277,9 @@ object KafkaConfig {
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
val ControlledShutdownEnableProp = "controlled.shutdown.enable"
- /** ********* Consumer coordinator configuration ***********/
- val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
- val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
+ /** ********* Group coordinator configuration ***********/
+ val GroupMinSessionTimeoutMsProp = "group.min.session.timeout.ms"
+ val GroupMaxSessionTimeoutMsProp = "group.max.session.timeout.ms"
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
@@ -619,8 +619,8 @@ object KafkaConfig {
.define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc)
/** ********* Consumer coordinator configuration ***********/
- .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
- .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
+ .define(GroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc)
+ .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
/** ********* Offset management configuration ***********/
.define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)
@@ -799,9 +799,9 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val controlledShutdownRetryBackoffMs = getLong(KafkaConfig.ControlledShutdownRetryBackoffMsProp)
val controlledShutdownEnable = getBoolean(KafkaConfig.ControlledShutdownEnableProp)
- /** ********* Consumer coordinator configuration ***********/
- val consumerMinSessionTimeoutMs = getInt(KafkaConfig.ConsumerMinSessionTimeoutMsProp)
- val consumerMaxSessionTimeoutMs = getInt(KafkaConfig.ConsumerMaxSessionTimeoutMsProp)
+ /** ********* Group coordinator configuration ***********/
+ val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
+ val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
/** ********* Offset management configuration ***********/
val offsetMetadataMaxSize = getInt(KafkaConfig.OffsetMetadataMaxSizeProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index beea83a..84d48cb 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -50,7 +50,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
import kafka.network.{BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
-import kafka.coordinator.{ConsumerCoordinator}
+import kafka.coordinator.{GroupManagerConfig, GroupCoordinator}
object KafkaServer {
// Copy the subset of properties that are relevant to Logs
@@ -119,7 +119,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var dynamicConfigManager: DynamicConfigManager = null
- var consumerCoordinator: ConsumerCoordinator = null
+ var consumerCoordinator: GroupCoordinator = null
var kafkaController: KafkaController = null
@@ -187,7 +187,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
kafkaController.startup()
/* start kafka coordinator */
- consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
+ consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler)
consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index bdc3bb6..967dc6f 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -32,7 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.tools.MessageFormatter
import kafka.api.ProducerResponseStatus
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
import scala.Some
import scala.collection._
@@ -144,9 +144,9 @@ class OffsetManager(val config: OffsetManagerConfig,
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
- val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+ val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
partitionOpt.map { partition =>
- val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+ val appendPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
val messages = tombstones.map(_._2).toSeq
trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@@ -225,7 +225,7 @@ class OffsetManager(val config: OffsetManagerConfig,
)
}.toSeq
- val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId))
+ val offsetTopicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, partitionFor(groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -336,7 +336,7 @@ class OffsetManager(val config: OffsetManagerConfig,
*/
def loadOffsetsFromLog(offsetsPartition: Int) {
- val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
+ val topicPartition = TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)
loadingPartitions synchronized {
if (loadingPartitions.contains(offsetsPartition)) {
@@ -408,7 +408,7 @@ class OffsetManager(val config: OffsetManagerConfig,
}
private def getHighWatermark(partitionId: Int): Long = {
- val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId)
+ val partitionOpt = replicaManager.getPartition(GroupCoordinator.OffsetsTopicName, partitionId)
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -436,7 +436,7 @@ class OffsetManager(val config: OffsetManagerConfig,
}
if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
- .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)))
+ .format(numRemoved, TopicAndPartition(GroupCoordinator.OffsetsTopicName, offsetsPartition)))
}
def shutdown() {
@@ -448,7 +448,7 @@ class OffsetManager(val config: OffsetManagerConfig,
* If the topic does not exist, the configured partition count is returned.
*/
private def getOffsetsTopicPartitionCount = {
- val topic = ConsumerCoordinator.OffsetsTopicName
+ val topic = GroupCoordinator.OffsetsTopicName
val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
if (topicData(topic).nonEmpty)
topicData(topic).size
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 84bebef..f99f0d8 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -29,7 +29,7 @@ import org.junit.{Test, Before}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
@@ -50,7 +50,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout
+ this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
@@ -154,7 +154,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
val numRecords = 10000
sendRecords(numRecords)
- consumer0.subscribe(List(topic))
+ val rebalanceListener = new ConsumerRebalanceListener {
+ override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) = {
+ // keep partitions paused in this test so that we can verify the commits based on specific seeks
+ partitions.foreach(consumer0.pause(_))
+ }
+
+ override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) = {}
+ }
+
+ consumer0.subscribe(List(topic), rebalanceListener)
val assignment = Set(tp, tp2)
TestUtils.waitUntilTrue(() => {
@@ -166,11 +175,11 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
consumer0.seek(tp2, 500)
// change subscription to trigger rebalance
- consumer0.subscribe(List(topic, topic2))
+ consumer0.subscribe(List(topic, topic2), rebalanceListener)
val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
TestUtils.waitUntilTrue(() => {
- consumer0.poll(50)
+ val records = consumer0.poll(50)
consumer0.assignment() == newAssignment.asJava
}, s"Expected partitions ${newAssignment.asJava} but actually got ${consumer0.assignment()}")
@@ -421,9 +430,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
consumer0.poll(50)
// get metadata for the topic
- var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+ var parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
while(parts == null)
- parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
+ parts = consumer0.partitionsFor(GroupCoordinator.OffsetsTopicName).asScala
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
@@ -436,6 +445,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
consumer0.poll(50)
assertEquals(2, listener.callsToAssigned)
+
+ // only expect one revocation since revoke is not invoked on initial membership
assertEquals(2, listener.callsToRevoked)
consumer0.close()
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index db610c1..f2b0f85 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -19,13 +19,15 @@ import kafka.server.KafkaConfig
import kafka.utils.{Logging, ShutdownableThread, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownConsumerIdException}
+import org.apache.kafka.common.errors.{IllegalGenerationException, UnknownMemberIdException}
import org.apache.kafka.common.TopicPartition
import org.junit.Assert._
import org.junit.{Test, Before}
import scala.collection.JavaConversions._
+
+
/**
* Integration tests for the new consumer that cover basic usage as well as server failures
*/
@@ -43,7 +45,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset
this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set small enough session timeout
+ this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -108,7 +110,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
} catch {
// TODO: should be no need to catch these exceptions once KAFKA-2017 is
// merged since coordinator fail-over will not cause a rebalance
- case _: UnknownConsumerIdException | _: IllegalGenerationException =>
+ case _: UnknownMemberIdException | _: IllegalGenerationException =>
}
}
scheduler.shutdown()
@@ -176,4 +178,6 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
}
futures.map(_.get)
}
+
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 2ec59fb..5741ce2 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -18,16 +18,16 @@
package kafka.api
import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import kafka.utils.TestUtils
import java.util.Properties
-import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
+
import org.junit.{After, Before}
import scala.collection.mutable.Buffer
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
@@ -60,14 +60,14 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl)
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
- consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
for(i <- 0 until producerCount)
producers += new KafkaProducer(producerConfig)
- for(i <- 0 until consumerCount)
+ for(i <- 0 until consumerCount) {
consumers += new KafkaConsumer(consumerConfig)
+ }
// create the consumer offset topic
- TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName,
+ TestUtils.createTopic(zkUtils, GroupCoordinator.OffsetsTopicName,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index bdf7e49..735a3b2 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -101,7 +101,6 @@ class QuotasTest extends KafkaServerTestHarness {
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
- consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range")
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
consumers += new KafkaConsumer(consumerProps)
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
deleted file mode 100644
index 1d13d88..0000000
--- a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package kafka.api
-
-import java.io.File
-
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-class SslConsumerTest extends BaseConsumerTest {
- override protected def securityProtocol = SecurityProtocol.SSL
- override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
-}