You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:10 UTC
[07/10] kafka git commit: KAFKA-5059: Implement Transactional
Coordinator
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
new file mode 100644
index 0000000..063eee7
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import java.io.PrintStream
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import com.yammer.metrics.core.Gauge
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.common.{MessageFormatter, _}
+import kafka.coordinator.group._
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ReplicaManager
+import kafka.utils.CoreUtils.inLock
+import kafka.utils._
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.types.Type._
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection._
+import scala.collection.mutable.ListBuffer
+
+class GroupMetadataManager(brokerId: Int,
+ interBrokerProtocolVersion: ApiVersion,
+ config: OffsetConfig,
+ replicaManager: ReplicaManager,
+ zkUtils: ZkUtils,
+ time: Time) extends Logging with KafkaMetricsGroup {
+
+ private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
+
+ private val groupMetadataCache = new Pool[String, GroupMetadata]
+
+ /* lock protecting access to loading and owned partition sets */
+ private val partitionLock = new ReentrantLock()
+
+ /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
+ private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+
+ /* partitions of consumer groups that are assigned, using the same loading partition lock */
+ private val ownedPartitions: mutable.Set[Int] = mutable.Set()
+
+ /* shutting down flag */
+ private val shuttingDown = new AtomicBoolean(false)
+
+ /* number of partitions for the consumer metadata topic */
+ private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
+
+ /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
+ private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
+
+ this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
+
+ newGauge("NumOffsets",
+ new Gauge[Int] {
+ def value = groupMetadataCache.values.map(group => {
+ group synchronized { group.numOffsets }
+ }).sum
+ }
+ )
+
+ newGauge("NumGroups",
+ new Gauge[Int] {
+ def value = groupMetadataCache.size
+ }
+ )
+
+ def enableMetadataExpiration() {
+ scheduler.startup()
+
+ scheduler.schedule(name = "delete-expired-group-metadata",
+ fun = cleanupGroupMetadata,
+ period = config.offsetsRetentionCheckIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
+ }
+
+ def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
+
+ def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) }
+
+ def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) }
+
+ def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
+
+ def isGroupLocal(groupId: String): Boolean = isPartitionOwned(partitionFor(groupId))
+
+ def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
+
+ def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
+
+ /**
+ * Get the group associated with the given groupId, or null if not found
+ */
+ def getGroup(groupId: String): Option[GroupMetadata] = {
+ Option(groupMetadataCache.get(groupId))
+ }
+
+ /**
+ * Add a group or get the group associated with the given groupId if it already exists
+ */
+ def addGroup(group: GroupMetadata): GroupMetadata = {
+ val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
+ if (currentGroup != null) {
+ currentGroup
+ } else {
+ group
+ }
+ }
+
+ def prepareStoreGroup(group: GroupMetadata,
+ groupAssignment: Map[String, Array[Byte]],
+ responseCallback: Errors => Unit): Option[DelayedStore] = {
+ getMagic(partitionFor(group.groupId)) match {
+ case Some(magicValue) =>
+ val groupMetadataValueVersion = {
+ if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
+ 0.toShort
+ else
+ GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+ }
+
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
+ val key = GroupMetadataManager.groupMetadataKey(group.groupId)
+ val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)
+
+ val records = {
+ val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
+ Seq(new SimpleRecord(timestamp, key, value)).asJava))
+ val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+ builder.append(timestamp, key, value)
+ builder.build()
+ }
+
+ val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+ val groupMetadataRecords = Map(groupMetadataPartition -> records)
+ val generationId = group.generationId
+
+ // set the callback function to insert the created group into cache after log append completed
+ def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+ // the append response should only contain the topics partition
+ if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
+ throw new IllegalStateException("Append status %s should only have one partition %s"
+ .format(responseStatus, groupMetadataPartition))
+
+ // construct the error status in the propagated assignment response
+ // in the cache
+ val status = responseStatus(groupMetadataPartition)
+
+ val responseError = if (status.error == Errors.NONE) {
+ Errors.NONE
+ } else {
+ debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
+ s"due to ${status.error.exceptionName}")
+
+ // transform the log append error code to the corresponding the commit status error code
+ status.error match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.COORDINATOR_NOT_AVAILABLE
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR
+
+ case Errors.REQUEST_TIMED_OUT =>
+ Errors.REBALANCE_IN_PROGRESS
+
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
+ s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
+
+ Errors.UNKNOWN
+
+ case other =>
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
+ s"due to unexpected error: ${status.error.exceptionName}")
+
+ other
+ }
+ }
+
+ responseCallback(responseError)
+ }
+ Some(DelayedStore(groupMetadataRecords, putCacheCallback))
+
+ case None =>
+ responseCallback(Errors.NOT_COORDINATOR)
+ None
+ }
+ }
+
+ def store(delayedStore: DelayedStore) {
+ // call replica manager to append the group message
+ replicaManager.appendRecords(
+ config.offsetCommitTimeoutMs.toLong,
+ config.offsetCommitRequiredAcks,
+ true, // allow appending to internal offset topic
+ delayedStore.partitionRecords,
+ delayedStore.callback)
+ }
+
+ /**
+ * Store offsets by appending it to the replicated log and then inserting to cache
+ */
+ def prepareStoreOffsets(group: GroupMetadata,
+ consumerId: String,
+ generationId: Int,
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Option[DelayedStore] = {
+ // first filter out partitions with offset metadata size exceeding limit
+ val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
+ validateOffsetMetadataLength(offsetAndMetadata.metadata)
+ }
+
+ // construct the message set to append
+ if (filteredOffsetMetadata.isEmpty) {
+ // compute the final error codes for the commit response
+ val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE)
+ responseCallback(commitStatus)
+ None
+ } else {
+ getMagic(partitionFor(group.groupId)) match {
+ case Some(magicValue) =>
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
+
+ val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+ val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
+ val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+ new SimpleRecord(timestamp, key, value)
+ }
+ val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+ val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
+ val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+ records.foreach(builder.append)
+ val entries = Map(offsetTopicPartition -> builder.build())
+
+ // set the callback function to insert offsets into cache after log append completed
+ def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+ // the append response should only contain the topics partition
+ if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
+ throw new IllegalStateException("Append status %s should only have one partition %s"
+ .format(responseStatus, offsetTopicPartition))
+
+ // construct the commit response status and insert
+ // the offset and metadata to cache if the append status has no error
+ val status = responseStatus(offsetTopicPartition)
+
+ val responseError = group synchronized {
+ if (status.error == Errors.NONE) {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+ group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
+ }
+ }
+ Errors.NONE
+ } else {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+ group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
+ }
+ }
+
+ debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
+ s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
+
+ // transform the log append error code to the corresponding the commit status error code
+ status.error match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.COORDINATOR_NOT_AVAILABLE
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR
+
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+ Errors.INVALID_COMMIT_OFFSET_SIZE
+
+ case other => other
+ }
+ }
+ }
+
+ // compute the final error codes for the commit response
+ val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+ if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+ (topicPartition, responseError)
+ else
+ (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+ }
+
+ // finally trigger the callback logic passed from the API layer
+ responseCallback(commitStatus)
+ }
+
+ group synchronized {
+ group.prepareOffsetCommit(offsetMetadata)
+ }
+
+ Some(DelayedStore(entries, putCacheCallback))
+
+ case None =>
+ val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
+ (topicPartition, Errors.NOT_COORDINATOR)
+ }
+ responseCallback(commitStatus)
+ None
+ }
+ }
+ }
+
+ /**
+ * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
+ * returns the current offset or it begins to sync the cache from the log (and returns an error code).
+ */
+ def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
+ trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+ val group = groupMetadataCache.get(groupId)
+ if (group == null) {
+ topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
+ }.toMap
+ } else {
+ group synchronized {
+ if (group.is(Dead)) {
+ topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
+ }.toMap
+ } else {
+ topicPartitionsOpt match {
+ case None =>
+ // Return offsets for all partitions owned by this consumer group. (this only applies to consumers
+ // that commit offsets to Kafka.)
+ group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
+ topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
+ }
+
+ case Some(topicPartitions) =>
+ topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+ val partitionData = group.offset(topicPartition) match {
+ case None =>
+ new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
+ case Some(offsetAndMetadata) =>
+ new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
+ }
+ topicPartition -> partitionData
+ }.toMap
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Asynchronously read the partition from the offsets topic and populate the cache
+ */
+ def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
+ val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+
+ def doLoadGroupsAndOffsets() {
+ info(s"Loading offsets and group metadata from $topicPartition")
+
+ inLock(partitionLock) {
+ if (loadingPartitions.contains(offsetsPartition)) {
+ info(s"Offset load from $topicPartition already in progress.")
+ return
+ } else {
+ loadingPartitions.add(offsetsPartition)
+ }
+ }
+
+ try {
+ loadGroupsAndOffsets(topicPartition, onGroupLoaded)
+ } catch {
+ case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
+ } finally {
+ inLock(partitionLock) {
+ ownedPartitions.add(offsetsPartition)
+ loadingPartitions.remove(offsetsPartition)
+ }
+ }
+ }
+
+ scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+ }
+
+ private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
+ def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
+
+ val startMs = time.milliseconds()
+ replicaManager.getLog(topicPartition) match {
+ case None =>
+ warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
+
+ case Some(log) =>
+ var currOffset = log.logStartOffset
+ val buffer = ByteBuffer.allocate(config.loadBufferSize)
+ // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
+ val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
+ val removedOffsets = mutable.Set[GroupTopicPartition]()
+ val loadedGroups = mutable.Map[String, GroupMetadata]()
+ val removedGroups = mutable.Set[String]()
+
+ while (currOffset < highWaterMark && !shuttingDown.get()) {
+ buffer.clear()
+ val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
+ .records.asInstanceOf[FileRecords]
+ val bufferRead = fileRecords.readInto(buffer, 0)
+
+ MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+ for (record <- batch.asScala) {
+ require(record.hasKey, "Group metadata/offset entry key should not be null")
+ GroupMetadataManager.readMessageKey(record.key) match {
+
+ case offsetKey: OffsetKey =>
+ // load offset
+ val key = offsetKey.key
+ if (!record.hasValue) {
+ loadedOffsets.remove(key)
+ removedOffsets.add(key)
+ } else {
+ val value = GroupMetadataManager.readOffsetMessageValue(record.value)
+ loadedOffsets.put(key, value)
+ removedOffsets.remove(key)
+ }
+
+ case groupMetadataKey: GroupMetadataKey =>
+ // load group metadata
+ val groupId = groupMetadataKey.key
+ val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+ if (groupMetadata != null) {
+ trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
+ removedGroups.remove(groupId)
+ loadedGroups.put(groupId, groupMetadata)
+ } else {
+ loadedGroups.remove(groupId)
+ removedGroups.add(groupId)
+ }
+
+ case unknownKey =>
+ throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+ }
+
+ currOffset = batch.nextOffset
+ }
+ }
+
+ val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+ .groupBy(_._1.group)
+ .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
+ .partition { case (group, _) => loadedGroups.contains(group) }
+
+ loadedGroups.values.foreach { group =>
+ val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
+ loadGroup(group, offsets)
+ onGroupLoaded(group)
+ }
+
+ // load groups which store offsets in kafka, but which have no active members and thus no group
+ // metadata stored in the log
+ emptyGroupOffsets.foreach { case (groupId, offsets) =>
+ val group = new GroupMetadata(groupId)
+ loadGroup(group, offsets)
+ onGroupLoaded(group)
+ }
+
+ removedGroups.foreach { groupId =>
+ // if the cache already contains a group which should be removed, raise an error. Note that it
+ // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
+ // offset storage (i.e. by "simple" consumers)
+ if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
+ throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
+ s"loading partition $topicPartition")
+ }
+
+ if (!shuttingDown.get())
+ info("Finished loading offsets from %s in %d milliseconds."
+ .format(topicPartition, time.milliseconds() - startMs))
+ }
+ }
+ }
+
+ private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {
+ // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
+ // view of the group's offsets
+ val loadedOffsets = offsets.mapValues { offsetAndMetadata =>
+ // special handling for version 0:
+ // set the expiration time stamp as commit time stamp + server default retention time
+ if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+ offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
+ else
+ offsetAndMetadata
+ }
+ trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
+ group.initializeOffsets(loadedOffsets)
+
+ val currentGroup = addGroup(group)
+ if (group != currentGroup)
+ debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+ s"because there is already a cached group with generation ${currentGroup.generationId}")
+ }
+
+ /**
+ * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
+ * that partition.
+ *
+ * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+ */
+ def removeGroupsForPartition(offsetsPartition: Int,
+ onGroupUnloaded: GroupMetadata => Unit) {
+ val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+
+ def removeGroupsAndOffsets() {
+ var numOffsetsRemoved = 0
+ var numGroupsRemoved = 0
+
+ inLock(partitionLock) {
+ // we need to guard the group removal in cache in the loading partition lock
+ // to prevent coordinator's check-and-get-group race condition
+ ownedPartitions.remove(offsetsPartition)
+
+ for (group <- groupMetadataCache.values) {
+ if (partitionFor(group.groupId) == offsetsPartition) {
+ onGroupUnloaded(group)
+ groupMetadataCache.remove(group.groupId, group)
+ numGroupsRemoved += 1
+ numOffsetsRemoved += group.numOffsets
+ }
+ }
+ }
+
+ if (numOffsetsRemoved > 0)
+ info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
+
+ if (numGroupsRemoved > 0)
+ info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
+ }
+ }
+
+ // visible for testing
+ private[group] def cleanupGroupMetadata(): Unit = {
+ cleanupGroupMetadata(None)
+ }
+
+ def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) {
+ val startMs = time.milliseconds()
+ var offsetsRemoved = 0
+
+ groupMetadataCache.foreach { case (groupId, group) =>
+ val (removedOffsets, groupIsDead, generation) = group synchronized {
+ val removedOffsets = deletedTopicPartitions match {
+ case Some(topicPartitions) => group.removeOffsets(topicPartitions)
+ case None => group.removeExpiredOffsets(startMs)
+ }
+
+ if (group.is(Empty) && !group.hasOffsets) {
+ info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
+ group.transitionTo(Dead)
+ }
+ (removedOffsets, group.is(Dead), group.generationId)
+ }
+
+ val offsetsPartition = partitionFor(groupId)
+ val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+ getMagic(offsetsPartition) match {
+ case Some(magicValue) =>
+ // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
+
+ val partitionOpt = replicaManager.getPartition(appendPartition)
+ partitionOpt.foreach { partition =>
+ val tombstones = ListBuffer.empty[SimpleRecord]
+ removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
+ trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
+ val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
+ tombstones += new SimpleRecord(timestamp, commitKey, null)
+ }
+ trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
+
+ // We avoid writing the tombstone when the generationId is 0, since this group is only using
+ // Kafka for offset storage.
+ if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) {
+ // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+ // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+ // retry removing this group.
+ val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId)
+ tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)
+ trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
+ }
+
+ if (tombstones.nonEmpty) {
+ try {
+ // do not need to require acks since even if the tombstone is lost,
+ // it will be appended again in the next purge cycle
+ val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
+ partition.appendRecordsToLeader(records)
+ offsetsRemoved += removedOffsets.size
+ trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+ s"offsets and/or metadata for group $groupId")
+ } catch {
+ case t: Throwable =>
+ error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+ s"offsets and/or metadata for group $groupId.", t)
+ // ignore and continue
+ }
+ }
+ }
+
+ case None =>
+ info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups")
+ }
+ }
+
+ info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+ }
+
+ /*
+ * Check if the offset metadata length is valid
+ */
+ private def validateOffsetMetadataLength(metadata: String) : Boolean = {
+ metadata == null || metadata.length() <= config.maxMetadataSize
+ }
+
+
+ def shutdown() {
+ shuttingDown.set(true)
+ if (scheduler.isStarted)
+ scheduler.shutdown()
+
+ // TODO: clear the caches
+ }
+
+ /**
+ * Gets the partition count of the group metadata topic from ZooKeeper.
+ * If the topic does not exist, the configured partition count is returned.
+ */
+ private def getGroupMetadataTopicPartitionCount: Int = {
+ zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
+ }
+
+ /**
+ * Check if the replica is local and return the message format version and timestamp
+ *
+ * @param partition Partition of GroupMetadataTopic
+ * @return Some(MessageFormatVersion) if replica is local, None otherwise
+ */
+ private def getMagic(partition: Int): Option[Byte] =
+ replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition))
+
+ /**
+ * Add the partition into the owned list
+ *
+ * NOTE: this is for test only
+ */
+ def addPartitionOwnership(partition: Int) {
+ inLock(partitionLock) {
+ ownedPartitions.add(partition)
+ }
+ }
+}
+
+/**
+ * Messages stored for the group topic has versions for both the key and value fields. Key
+ * version is used to indicate the type of the message (also to differentiate different types
+ * of messages from being compacted together if they have the same field values); and value
+ * version is used to evolve the messages within their data types:
+ *
+ * key version 0: group consumption offset
+ * -> value version 0: [offset, metadata, timestamp]
+ *
+ * key version 1: group consumption offset
+ * -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
+ *
+ * key version 2: group metadata
+ * -> value version 0: [protocol_type, generation, protocol, leader, members]
+ */
+object GroupMetadataManager {
+
+ private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
+ private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
+
+ private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
+ new Field("topic", STRING),
+ new Field("partition", INT32))
+ private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
+ private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
+ private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")
+
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
+ private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
+ private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+ private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
+ new Field("metadata", STRING, "Associated metadata.", ""),
+ new Field("commit_timestamp", INT64),
+ new Field("expire_timestamp", INT64))
+ private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
+ private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
+ private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
+ private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
+
+ private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
+ private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
+
+ private val MEMBER_ID_KEY = "member_id"
+ private val CLIENT_ID_KEY = "client_id"
+ private val CLIENT_HOST_KEY = "client_host"
+ private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
+ private val SESSION_TIMEOUT_KEY = "session_timeout"
+ private val SUBSCRIPTION_KEY = "subscription"
+ private val ASSIGNMENT_KEY = "assignment"
+
+ private val MEMBER_METADATA_V0 = new Schema(
+ new Field(MEMBER_ID_KEY, STRING),
+ new Field(CLIENT_ID_KEY, STRING),
+ new Field(CLIENT_HOST_KEY, STRING),
+ new Field(SESSION_TIMEOUT_KEY, INT32),
+ new Field(SUBSCRIPTION_KEY, BYTES),
+ new Field(ASSIGNMENT_KEY, BYTES))
+
+ private val MEMBER_METADATA_V1 = new Schema(
+ new Field(MEMBER_ID_KEY, STRING),
+ new Field(CLIENT_ID_KEY, STRING),
+ new Field(CLIENT_HOST_KEY, STRING),
+ new Field(REBALANCE_TIMEOUT_KEY, INT32),
+ new Field(SESSION_TIMEOUT_KEY, INT32),
+ new Field(SUBSCRIPTION_KEY, BYTES),
+ new Field(ASSIGNMENT_KEY, BYTES))
+
+ private val PROTOCOL_TYPE_KEY = "protocol_type"
+ private val GENERATION_KEY = "generation"
+ private val PROTOCOL_KEY = "protocol"
+ private val LEADER_KEY = "leader"
+ private val MEMBERS_KEY = "members"
+
+ private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
+ new Field(PROTOCOL_TYPE_KEY, STRING),
+ new Field(GENERATION_KEY, INT32),
+ new Field(PROTOCOL_KEY, NULLABLE_STRING),
+ new Field(LEADER_KEY, NULLABLE_STRING),
+ new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
+
+ private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
+ new Field(PROTOCOL_TYPE_KEY, STRING),
+ new Field(GENERATION_KEY, INT32),
+ new Field(PROTOCOL_KEY, NULLABLE_STRING),
+ new Field(LEADER_KEY, NULLABLE_STRING),
+ new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+
+
+ // map of versions to key schemas as data types
+ private val MESSAGE_TYPE_SCHEMAS = Map(
+ 0 -> OFFSET_COMMIT_KEY_SCHEMA,
+ 1 -> OFFSET_COMMIT_KEY_SCHEMA,
+ 2 -> GROUP_METADATA_KEY_SCHEMA)
+
+ // map of version of offset value schemas
+ private val OFFSET_VALUE_SCHEMAS = Map(
+ 0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
+ 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
+ private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
+
+ // map of version of group metadata value schemas
+ private val GROUP_VALUE_SCHEMAS = Map(
+ 0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
+ 1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
+ private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
+
+ private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+ private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+
+ private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+ private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
+
+ private def schemaForKey(version: Int) = {
+ val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
+ schemaOpt match {
+ case Some(schema) => schema
+ case _ => throw new KafkaException("Unknown offset schema version " + version)
+ }
+ }
+
+ private def schemaForOffset(version: Int) = {
+ val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
+ schemaOpt match {
+ case Some(schema) => schema
+ case _ => throw new KafkaException("Unknown offset schema version " + version)
+ }
+ }
+
+ private def schemaForGroup(version: Int) = {
+ val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
+ schemaOpt match {
+ case Some(schema) => schema
+ case _ => throw new KafkaException("Unknown group metadata version " + version)
+ }
+ }
+
+ /**
+ * Generates the key for offset commit message for given (group, topic, partition)
+ *
+ * @return key for offset commit message
+ */
+ private[group] def offsetCommitKey(group: String, topicPartition: TopicPartition,
+ versionId: Short = 0): Array[Byte] = {
+ val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
+ key.set(OFFSET_KEY_GROUP_FIELD, group)
+ key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
+ key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
+
+ val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+ byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+ key.writeTo(byteBuffer)
+ byteBuffer.array()
+ }
+
+ /**
+ * Generates the key for group metadata message for given group
+ *
+ * @return key bytes for group metadata message
+ */
+ private[group] def groupMetadataKey(group: String): Array[Byte] = {
+ val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
+ key.set(GROUP_KEY_GROUP_FIELD, group)
+
+ val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+ byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+ key.writeTo(byteBuffer)
+ byteBuffer.array()
+ }
+
+ /**
+ * Generates the payload for offset commit message from given offset and metadata
+ *
+ * @param offsetAndMetadata consumer's current offset and metadata
+ * @return payload for offset commit message
+ */
+ private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+ // generate commit value with schema version 1
+ val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
+ value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+ value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+ value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+ value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
+ val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+ byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+ value.writeTo(byteBuffer)
+ byteBuffer.array()
+ }
+
+ /**
+ * Generates the payload for group metadata message from given offset and metadata
+ * assuming the generation id, selected protocol, leader and member assignment are all available
+ *
+ * @param groupMetadata current group metadata
+ * @param assignment the assignment for the rebalancing generation
+ * @param version the version of the value message to use
+ * @return payload for offset commit message
+ */
+ private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
+ assignment: Map[String, Array[Byte]],
+ version: Short = 0): Array[Byte] = {
+ val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+
+ value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
+ value.set(GENERATION_KEY, groupMetadata.generationId)
+ value.set(PROTOCOL_KEY, groupMetadata.protocol)
+ value.set(LEADER_KEY, groupMetadata.leaderId)
+
+ val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
+ val memberStruct = value.instance(MEMBERS_KEY)
+ memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
+ memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
+ memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
+ memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+
+ if (version > 0)
+ memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
+
+ val metadata = memberMetadata.metadata(groupMetadata.protocol)
+ memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
+
+ val memberAssignment = assignment(memberMetadata.memberId)
+ assert(memberAssignment != null)
+
+ memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
+
+ memberStruct
+ }
+
+ value.set(MEMBERS_KEY, memberArray.toArray)
+
+ val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+ byteBuffer.putShort(version)
+ value.writeTo(byteBuffer)
+ byteBuffer.array()
+ }
+
+ /**
+ * Decodes the offset messages' key
+ *
+ * @param buffer input byte-buffer
+ * @return an GroupTopicPartition object
+ */
+ def readMessageKey(buffer: ByteBuffer): BaseKey = {
+ val version = buffer.getShort
+ val keySchema = schemaForKey(version)
+ val key = keySchema.read(buffer)
+
+ if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
+ // version 0 and 1 refer to offset
+ val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
+ val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
+ val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
+
+ OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
+
+ } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
+ // version 2 refers to offset
+ val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
+
+ GroupMetadataKey(version, group)
+ } else {
+ throw new IllegalStateException("Unknown version " + version + " for group metadata message")
+ }
+ }
+
+ /**
+ * Decodes the offset messages' payload and retrieves offset and metadata from it
+ *
+ * @param buffer input byte-buffer
+ * @return an offset-metadata object from the message
+ */
+ def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+ if (buffer == null) { // tombstone
+ null
+ } else {
+ val version = buffer.getShort
+ val valueSchema = schemaForOffset(version)
+ val value = valueSchema.read(buffer)
+
+ if (version == 0) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
+ val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
+
+ OffsetAndMetadata(offset, metadata, timestamp)
+ } else if (version == 1) {
+ val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
+ val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
+ val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+ val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+
+ OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+ } else {
+ throw new IllegalStateException("Unknown offset message version")
+ }
+ }
+ }
+
+ /**
+ * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
+ *
+ * @param buffer input byte-buffer
+ * @return a group metadata object from the message
+ */
+ def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
+ if (buffer == null) { // tombstone
+ null
+ } else {
+ val version = buffer.getShort
+ val valueSchema = schemaForGroup(version)
+ val value = valueSchema.read(buffer)
+
+ if (version == 0 || version == 1) {
+ val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
+
+ val memberMetadataArray = value.getArray(MEMBERS_KEY)
+ val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+
+ val group = new GroupMetadata(groupId, initialState)
+
+ group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
+ group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
+ group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
+
+ memberMetadataArray.foreach { memberMetadataObj =>
+ val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
+ val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+ val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+ val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+ val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+ val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+
+ val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
+
+ val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+ protocolType, List((group.protocol, subscription)))
+
+ member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
+
+ group.add(member)
+ }
+
+ group
+ } else {
+ throw new IllegalStateException("Unknown group metadata message version")
+ }
+ }
+ }
+
+ // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
+ // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
+ class OffsetsMessageFormatter extends MessageFormatter {
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+ Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+ // Only print if the message is an offset record.
+ // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+ case offsetKey: OffsetKey =>
+ val groupTopicPartition = offsetKey.key
+ val value = consumerRecord.value
+ val formattedValue =
+ if (value == null) "NULL"
+ else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+ output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
+ output.write("::".getBytes(StandardCharsets.UTF_8))
+ output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+ output.write("\n".getBytes(StandardCharsets.UTF_8))
+ case _ => // no-op
+ }
+ }
+ }
+
+ // Formatter for use with tools to read group metadata history
+ class GroupMetadataMessageFormatter extends MessageFormatter {
+ def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+ Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+ // Only print if the message is a group metadata record.
+ // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+ case groupMetadataKey: GroupMetadataKey =>
+ val groupId = groupMetadataKey.key
+ val value = consumerRecord.value
+ val formattedValue =
+ if (value == null) "NULL"
+ else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+ output.write(groupId.getBytes(StandardCharsets.UTF_8))
+ output.write("::".getBytes(StandardCharsets.UTF_8))
+ output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+ output.write("\n".getBytes(StandardCharsets.UTF_8))
+ case _ => // no-op
+ }
+ }
+ }
+
+}
+
+case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords],
+ callback: Map[TopicPartition, PartitionResponse] => Unit)
+
+case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
+
+ def this(group: String, topic: String, partition: Int) =
+ this(group, new TopicPartition(topic, partition))
+
+ override def toString: String =
+ "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
+}
+
+trait BaseKey{
+ def version: Short
+ def key: Any
+}
+
+case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
+
+ override def toString: String = key.toString
+}
+
+case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
+
+ override def toString: String = key
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
new file mode 100644
index 0000000..ce542b1
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import java.util
+
+import kafka.utils.nonthreadsafe
+import org.apache.kafka.common.protocol.Errors
+
+
+case class MemberSummary(memberId: String,
+ clientId: String,
+ clientHost: String,
+ metadata: Array[Byte],
+ assignment: Array[Byte])
+
+/**
+ * Member metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ * its rebalance callback will be kept in the metadata if the
+ * member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ * is kept in metadata until the leader provides the group assignment
+ * and the group transitions to stable
+ */
+@nonthreadsafe
+private[group] class MemberMetadata(val memberId: String,
+ val groupId: String,
+ val clientId: String,
+ val clientHost: String,
+ val rebalanceTimeoutMs: Int,
+ val sessionTimeoutMs: Int,
+ val protocolType: String,
+ var supportedProtocols: List[(String, Array[Byte])]) {
+
+ var assignment: Array[Byte] = Array.empty[Byte]
+ var awaitingJoinCallback: JoinGroupResult => Unit = null
+ var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null
+ var latestHeartbeat: Long = -1
+ var isLeaving: Boolean = false
+
+ def protocols = supportedProtocols.map(_._1).toSet
+
+ /**
+ * Get metadata corresponding to the provided protocol.
+ */
+ def metadata(protocol: String): Array[Byte] = {
+ supportedProtocols.find(_._1 == protocol) match {
+ case Some((_, metadata)) => metadata
+ case None =>
+ throw new IllegalArgumentException("Member does not support protocol")
+ }
+ }
+
+ /**
+ * Check if the provided protocol metadata matches the currently stored metadata.
+ */
+ def matches(protocols: List[(String, Array[Byte])]): Boolean = {
+ if (protocols.size != this.supportedProtocols.size)
+ return false
+
+ for (i <- 0 until protocols.size) {
+ val p1 = protocols(i)
+ val p2 = supportedProtocols(i)
+ if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
+ return false
+ }
+ true
+ }
+
+ def summary(protocol: String): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+ }
+
+ def summaryNoMetadata(): MemberSummary = {
+ MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+ }
+
+ /**
+ * Vote for one of the potential group protocols. This takes into account the protocol preference as
+ * indicated by the order of supported protocols and returns the first one also contained in the set
+ */
+ def vote(candidates: Set[String]): String = {
+ supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
+ case Some((protocol, _)) => protocol
+ case None =>
+ throw new IllegalArgumentException("Member does not support any of the candidate protocols")
+ }
+ }
+
+ override def toString = {
+ "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
new file mode 100644
index 0000000..2b9a60f
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.message.{CompressionCodec, NoCompressionCodec}
+
+/**
+ * Configuration settings for in-built offset management
+ * @param maxMetadataSize The maximum allowed metadata for any offset commit.
+ * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
+ * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
+ * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
+ * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
+ * log compaction and faster offset loads
+ * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
+ * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
+ * order to achieve "atomic" commits.
+ * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
+ * commit or this timeout is reached. (Similar to the producer request timeout.)
+ * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
+ * should not be overridden.
+ */
+case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize,
+ loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize,
+ offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs,
+ offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs,
+ offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions,
+ offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes,
+ offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor,
+ offsetsTopicCompressionCodec: CompressionCodec = OffsetConfig.DefaultOffsetsTopicCompressionCodec,
+ offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs,
+ offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks)
+
+object OffsetConfig {
+ val DefaultMaxMetadataSize = 4096
+ val DefaultLoadBufferSize = 5*1024*1024
+ val DefaultOffsetRetentionMs = 24*60*60*1000L
+ val DefaultOffsetsRetentionCheckIntervalMs = 600000L
+ val DefaultOffsetsTopicNumPartitions = 50
+ val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
+ val DefaultOffsetsTopicReplicationFactor = 3.toShort
+ val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
+ val DefaultOffsetCommitTimeoutMs = 5000
+ val DefaultOffsetCommitRequiredAcks = (-1).toShort
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
new file mode 100644
index 0000000..9300e6a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import kafka.server.DelayedOperation
+import org.apache.kafka.common.protocol.Errors
+
+/**
+ * Delayed transaction state change operations that are added to the purgatory without timeout (i.e. these operations should never time out)
+ */
+private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
+ completionCallback: Errors => Unit)
+ extends DelayedOperation(Long.MaxValue) {
+
+ // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to
+ // call purgatory operations while holding the group lock.
+ override def safeTryComplete(): Boolean = tryComplete()
+
+ override def tryComplete(): Boolean = {
+ txnMetadata synchronized {
+ if (txnMetadata.topicPartitions.isEmpty)
+ forceComplete()
+ else false
+ }
+ }
+
+ override def onExpiration(): Unit = {
+ // this should never happen
+ throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
+ }
+
+ override def onComplete(): Unit = completionCallback(Errors.NONE)
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
new file mode 100644
index 0000000..bb7f57b
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import kafka.common.KafkaException
+import kafka.utils.{Json, Logging, ZkUtils}
+
+/**
+ * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way
+ * such that the same PID will not be assigned twice across multiple transaction coordinators.
+ *
+ * PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
+ * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+ */
+object ProducerIdManager extends Logging {
+ val CurrentVersion: Long = 1L
+ val PidBlockSize: Long = 1000L
+
+ def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+ Json.encode(Map("version" -> CurrentVersion,
+ "broker" -> pidBlock.brokerId,
+ "block_start" -> pidBlock.blockStartPid.toString,
+ "block_end" -> pidBlock.blockEndPid.toString)
+ )
+ }
+
+ def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+ try {
+ Json.parseFull(jsonData).flatMap { m =>
+ val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
+ val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
+ val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
+ val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
+ Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
+ }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+ } catch {
+ case e: java.lang.NumberFormatException =>
+ // this should never happen: the written data has exceeded long type limit
+ fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+ throw e
+ }
+ }
+}
+
+case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+ override def toString: String = {
+ val pidBlockInfo = new StringBuilder
+ pidBlockInfo.append("(brokerId:" + brokerId)
+ pidBlockInfo.append(",blockStartPID:" + blockStartPid)
+ pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
+ pidBlockInfo.toString()
+ }
+}
+
+class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+
+ this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+ private var currentPIDBlock: ProducerIdBlock = null
+ private var nextPID: Long = -1L
+
+ // grab the first block of PIDs
+ this synchronized {
+ getNewPidBlock()
+ nextPID = currentPIDBlock.blockStartPid
+ }
+
+ private def getNewPidBlock(): Unit = {
+ var zkWriteComplete = false
+ while (!zkWriteComplete) {
+ // refresh current pid block from zookeeper again
+ val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+
+ // generate the new pid block
+ currentPIDBlock = dataOpt match {
+ case Some(data) =>
+ val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+ debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+
+ if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+ // we have exhausted all pids (wow!), treat it as a fatal error
+ fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
+ throw new KafkaException("Have exhausted all pids.")
+ }
+
+ ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+ case None =>
+ debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+ ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+ }
+
+ val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+
+ // try to write the new pid block into zookeeper
+ val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+ zkWriteComplete = succeeded
+
+ if (zkWriteComplete)
+ info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+ }
+ }
+
+ private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+ try {
+ val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
+ val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+ dataOpt match {
+ case Some(data) =>
+ val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+ (currPIDBlock.equals(expectedPidBlock), zkVersion)
+ case None =>
+ (false, -1)
+ }
+ } catch {
+ case e: Exception =>
+ warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
+
+ (false, -1)
+ }
+ }
+
+ def nextPid(): Long = {
+ this synchronized {
+ // grab a new block of PIDs if this block has been exhausted
+ if (nextPID > currentPIDBlock.blockEndPid) {
+ getNewPidBlock()
+ nextPID = currentPIDBlock.blockStartPid + 1
+ } else {
+ nextPID += 1
+ }
+
+ nextPID - 1
+ }
+ }
+
+ def shutdown() {
+ info(s"Shutdown complete: last PID assigned $nextPID")
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
new file mode 100644
index 0000000..343f7ea
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
+import kafka.utils.{Logging, Scheduler, ZkUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.TransactionResult
+import org.apache.kafka.common.utils.Time
+import kafka.utils.CoreUtils.inWriteLock
+
+
+object TransactionCoordinator {
+
+ def apply(config: KafkaConfig,
+ replicaManager: ReplicaManager,
+ scheduler: Scheduler,
+ zkUtils: ZkUtils,
+ metrics: Metrics,
+ metadataCache: MetadataCache,
+ time: Time): TransactionCoordinator = {
+
+ val txnConfig = TransactionConfig(config.transactionalIdExpirationMs,
+ config.transactionMaxTimeoutMs,
+ config.transactionTopicPartitions,
+ config.transactionTopicReplicationFactor,
+ config.transactionTopicSegmentBytes,
+ config.transactionsLoadBufferSize,
+ config.transactionTopicMinISR)
+
+ val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+ val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
+ val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId)
+ val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time)
+
+ new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, time)
+ }
+
+ private def initTransactionError(error: Errors): InitPidResult = {
+ InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
+ }
+
+ private def initTransactionMetadata(txnMetadata: TransactionMetadata): InitPidResult = {
+ InitPidResult(txnMetadata.pid, txnMetadata.producerEpoch, Errors.NONE)
+ }
+}
+
+/**
+ * Transaction coordinator handles message transactions sent by producers and communicate with brokers
+ * to update ongoing transaction's status.
+ *
+ * Each Kafka server instantiates a transaction coordinator which is responsible for a set of
+ * producers. Producers with specific transactional ids are assigned to their corresponding coordinators;
+ * Producers with no specific transactional id may talk to a random broker as their coordinators.
+ */
+class TransactionCoordinator(brokerId: Int,
+ pidManager: ProducerIdManager,
+ txnManager: TransactionStateManager,
+ txnMarkerChannelManager: TransactionMarkerChannelManager,
+ txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
+ time: Time) extends Logging {
+ this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+
+ import TransactionCoordinator._
+
+ type InitPidCallback = InitPidResult => Unit
+ type TxnMetadataUpdateCallback = Errors => Unit
+ type EndTxnCallback = Errors => Unit
+
+ /* Active flag of the coordinator */
+ private val isActive = new AtomicBoolean(false)
+
+ private val coordinatorLock = new ReentrantReadWriteLock
+
+ def handleInitPid(transactionalId: String,
+ transactionTimeoutMs: Int,
+ responseCallback: InitPidCallback): Unit = {
+ if (transactionalId == null || transactionalId.isEmpty) {
+ // if the transactional id is not specified, then always blindly accept the request
+ // and return a new pid from the pid manager
+ val pid = pidManager.nextPid()
+ responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+ } else if (!txnManager.isCoordinatorFor(transactionalId)) {
+ // check if it is the assigned coordinator for the transactional id
+ responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
+ } else if (txnManager.isCoordinatorLoadingInProgress(transactionalId)) {
+ responseCallback(initTransactionError(Errors.COORDINATOR_LOAD_IN_PROGRESS))
+ } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
+ // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
+ responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
+ } else {
+ // only try to get a new pid and update the cache if the transactional id is unknown
+ txnManager.getTransactionState(transactionalId) match {
+ case None =>
+ val pid = pidManager.nextPid()
+ val newMetadata: TransactionMetadata = new TransactionMetadata(pid = pid,
+ producerEpoch = 0,
+ txnTimeoutMs = transactionTimeoutMs,
+ state = Empty,
+ topicPartitions = collection.mutable.Set.empty[TopicPartition],
+ lastUpdateTimestamp = time.milliseconds())
+
+ val metadata = txnManager.addTransaction(transactionalId, newMetadata)
+
+ // there might be a concurrent thread that has just updated the mapping
+ // with the transactional id at the same time; in this case we will
+ // treat it as the metadata has existed and update it accordingly
+ metadata synchronized {
+ if (!metadata.eq(newMetadata))
+ initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
+ else
+ appendMetadataToLog(transactionalId, metadata, responseCallback)
+
+ }
+ case Some(metadata) =>
+ initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
+ }
+ }
+ }
+
+ private def appendMetadataToLog(transactionalId: String,
+ metadata: TransactionMetadata,
+ initPidCallback: InitPidCallback): Unit ={
+ def callback(errors: Errors): Unit = {
+ if (errors == Errors.NONE)
+ initPidCallback(initTransactionMetadata(metadata))
+ else
+ initPidCallback(initTransactionError(errors))
+ }
+ appendToLogInReadLock(transactionalId, metadata, callback)
+ }
+
+
+ private def initPidWithExistingMetadata(transactionalId: String,
+ transactionTimeoutMs: Int,
+ responseCallback: InitPidCallback,
+ metadata: TransactionMetadata) = {
+
+ metadata synchronized {
+ if (metadata.state == Ongoing) {
+ // abort the ongoing transaction
+ handleEndTransaction(transactionalId,
+ metadata.pid,
+ metadata.producerEpoch,
+ TransactionResult.ABORT,
+ (errors: Errors) => {
+ if (errors != Errors.NONE) {
+ responseCallback(initTransactionError(errors))
+ } else {
+ // init pid again
+ handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+ }
+ })
+ } else if (metadata.state == PrepareAbort || metadata.state == PrepareCommit) {
+ // wait for the commit to complete and then init pid again
+ txnMarkerPurgatory.tryCompleteElseWatch(new DelayedTxnMarker(metadata, (errors: Errors) => {
+ if (errors != Errors.NONE)
+ responseCallback(initTransactionError(errors))
+ else
+ handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+ }), Seq(metadata.pid))
+ } else {
+ metadata.producerEpoch = (metadata.producerEpoch + 1).toShort
+ metadata.txnTimeoutMs = transactionTimeoutMs
+ metadata.topicPartitions.clear()
+ metadata.lastUpdateTimestamp = time.milliseconds()
+ metadata.state = Empty
+ appendMetadataToLog(transactionalId, metadata, responseCallback)
+ }
+ }
+ }
+
+ private def validateTransactionalId(transactionalId: String): Errors =
+ if (transactionalId == null || transactionalId.isEmpty)
+ Errors.INVALID_REQUEST
+ else if (!txnManager.isCoordinatorFor(transactionalId))
+ Errors.NOT_COORDINATOR
+ else if (txnManager.isCoordinatorLoadingInProgress(transactionalId))
+ Errors.COORDINATOR_LOAD_IN_PROGRESS
+ else
+ Errors.NONE
+
+
+ def handleAddPartitionsToTransaction(transactionalId: String,
+ pid: Long,
+ epoch: Short,
+ partitions: collection.Set[TopicPartition],
+ responseCallback: TxnMetadataUpdateCallback): Unit = {
+ val errors = validateTransactionalId(transactionalId)
+ if (errors != Errors.NONE)
+ responseCallback(errors)
+ else {
+ // try to update the transaction metadata and append the updated metadata to txn log;
+ // if there is no such metadata treat it as invalid pid mapping error.
+ val (error, newMetadata) = txnManager.getTransactionState(transactionalId) match {
+ case None =>
+ (Errors.INVALID_PID_MAPPING, null)
+
+ case Some(metadata) =>
+ // generate the new transaction metadata with added partitions
+ metadata synchronized {
+ if (metadata.pid != pid) {
+ (Errors.INVALID_PID_MAPPING, null)
+ } else if (metadata.producerEpoch != epoch) {
+ (Errors.INVALID_PRODUCER_EPOCH, null)
+ } else if (metadata.pendingState.isDefined) {
+ // return a retriable exception to let the client backoff and retry
+ (Errors.CONCURRENT_TRANSACTIONS, null)
+ } else if (metadata.state != Empty && metadata.state != Ongoing) {
+ (Errors.INVALID_TXN_STATE, null)
+ } else if (partitions.subsetOf(metadata.topicPartitions)) {
+ // this is an optimization: if the partitions are already in the metadata reply OK immediately
+ (Errors.NONE, null)
+ } else {
+ val now = time.milliseconds()
+ val newMetadata = new TransactionMetadata(pid,
+ epoch,
+ metadata.txnTimeoutMs,
+ Ongoing,
+ metadata.topicPartitions ++ partitions,
+ if (metadata.state == Empty) now else metadata.transactionStartTime,
+ now)
+ metadata.prepareTransitionTo(Ongoing)
+ (Errors.NONE, newMetadata)
+ }
+ }
+ }
+
+ if (newMetadata != null) {
+ appendToLogInReadLock(transactionalId, newMetadata, responseCallback)
+ } else {
+ responseCallback(error)
+ }
+ }
+ }
+
+ def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
+ inWriteLock(coordinatorLock) {
+ txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
+ }
+ }
+
+ def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
+ inWriteLock(coordinatorLock) {
+ txnManager.removeTransactionsForPartition(transactionStateTopicPartitionId)
+ txnMarkerChannelManager.removeStateForPartition(transactionStateTopicPartitionId)
+ }
+ }
+
+ def handleEndTransaction(transactionalId: String,
+ pid: Long,
+ epoch: Short,
+ command: TransactionResult,
+ responseCallback: EndTxnCallback): Unit = {
+ val errors = validateTransactionalId(transactionalId)
+ if (errors != Errors.NONE)
+ responseCallback(errors)
+ else
+ txnManager.getTransactionState(transactionalId) match {
+ case None =>
+ responseCallback(Errors.INVALID_PID_MAPPING)
+ case Some(metadata) =>
+ metadata synchronized {
+ if (metadata.pid != pid)
+ responseCallback(Errors.INVALID_PID_MAPPING)
+ else if (metadata.producerEpoch != epoch)
+ responseCallback(Errors.INVALID_PRODUCER_EPOCH)
+ else metadata.state match {
+ case Ongoing =>
+ commitOrAbort(transactionalId, pid, epoch, command, responseCallback, metadata)
+ case CompleteCommit =>
+ if (command == TransactionResult.COMMIT)
+ responseCallback(Errors.NONE)
+ else
+ responseCallback(Errors.INVALID_TXN_STATE)
+ case CompleteAbort =>
+ if (command == TransactionResult.ABORT)
+ responseCallback(Errors.NONE)
+ else
+ responseCallback(Errors.INVALID_TXN_STATE)
+ case _ =>
+ responseCallback(Errors.INVALID_TXN_STATE)
+ }
+ }
+ }
+ }
+
+ private def appendToLogInReadLock(transactionalId: String,
+ metadata: TransactionMetadata,
+ callback: Errors =>Unit): Unit = {
+ def unlockCallback(errors:Errors): Unit = {
+ coordinatorLock.readLock().unlock()
+ callback(errors)
+ }
+ coordinatorLock.readLock().lock()
+ try {
+ txnManager.appendTransactionToLog(transactionalId,
+ metadata,
+ unlockCallback)
+ } catch {
+ case _:Throwable => coordinatorLock.readLock().unlock()
+ }
+
+ }
+ private def commitOrAbort(transactionalId: String,
+ pid: Long,
+ epoch: Short,
+ command: TransactionResult,
+ responseCallback: EndTxnCallback,
+ metadata: TransactionMetadata) = {
+ val nextState = if (command == TransactionResult.COMMIT) PrepareCommit else PrepareAbort
+ val newMetadata = new TransactionMetadata(pid,
+ epoch,
+ metadata.txnTimeoutMs,
+ nextState,
+ metadata.topicPartitions,
+ metadata.transactionStartTime,
+ time.milliseconds())
+ metadata.prepareTransitionTo(nextState)
+
+ def logAppendCallback(errors: Errors): Unit = {
+ // we can respond to the client immediately and continue to write the txn markers if
+ // the log append was successful
+ responseCallback(errors)
+ if (errors == Errors.NONE)
+ txnManager.coordinatorEpochFor(transactionalId) match {
+ case Some(coordinatorEpoch) =>
+ def completionCallback(error: Errors): Unit = {
+ error match {
+ case Errors.NONE =>
+ txnManager.getTransactionState(transactionalId) match {
+ case Some(preparedCommitMetadata) =>
+ val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort
+ val committedMetadata = new TransactionMetadata(pid,
+ epoch,
+ preparedCommitMetadata.txnTimeoutMs,
+ completedState,
+ preparedCommitMetadata.topicPartitions,
+ preparedCommitMetadata.transactionStartTime,
+ time.milliseconds())
+ preparedCommitMetadata.prepareTransitionTo(completedState)
+
+ def writeCommittedTransactionCallback(error: Errors): Unit =
+ error match {
+ case Errors.NONE =>
+ txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
+ case Errors.NOT_COORDINATOR =>
+ // this one should be completed by the new coordinator
+ warn(s"no longer the coordinator for transactionalId: $transactionalId")
+ case _ =>
+ warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
+ // retry until success
+ appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ }
+
+ appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+ case None =>
+ // this one should be completed by the new coordinator
+ warn(s"no longer the coordinator for transactionalId: $transactionalId")
+ }
+ case Errors.NOT_COORDINATOR =>
+ warn(s"no longer the coordinator for transactionalId: $transactionalId")
+ case _ =>
+ warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying")
+ txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
+ newMetadata,
+ coordinatorEpoch,
+ completionCallback)
+ }
+ }
+
+ txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback)
+ case None =>
+ // this one should be completed by the new coordinator
+ warn(s"no longer the coordinator for transactionalId: $transactionalId")
+ }
+ }
+
+ txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
+ }
+
+ def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
+
+ def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
+
+ /**
+ * Startup logic executed at the same time when the server starts up.
+ */
+ def startup(enablePidExpiration: Boolean = true) {
+ info("Starting up.")
+ if (enablePidExpiration)
+ txnManager.enablePidExpiration()
+ txnMarkerChannelManager.start()
+ isActive.set(true)
+
+ info("Startup complete.")
+ }
+
+ /**
+ * Shutdown logic executed at the same time when server shuts down.
+ * Ordering of actions should be reversed from the startup process.
+ */
+ def shutdown() {
+ info("Shutting down.")
+ isActive.set(false)
+ pidManager.shutdown()
+ txnManager.shutdown()
+ txnMarkerChannelManager.shutdown()
+ info("Shutdown complete.")
+ }
+}
+
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
\ No newline at end of file