You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:10 UTC

[07/10] kafka git commit: KAFKA-5059: Implement Transactional Coordinator

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
new file mode 100644
index 0000000..063eee7
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -0,0 +1,1123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import java.io.PrintStream
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import com.yammer.metrics.core.Gauge
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.common.{MessageFormatter, _}
+import kafka.coordinator.group._
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.ReplicaManager
+import kafka.utils.CoreUtils.inLock
+import kafka.utils._
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.types.Type._
+import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.collection.JavaConverters._
+import scala.collection._
+import scala.collection.mutable.ListBuffer
+
+class GroupMetadataManager(brokerId: Int,
+                           interBrokerProtocolVersion: ApiVersion,
+                           config: OffsetConfig,
+                           replicaManager: ReplicaManager,
+                           zkUtils: ZkUtils,
+                           time: Time) extends Logging with KafkaMetricsGroup {
+
+  private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
+
+  private val groupMetadataCache = new Pool[String, GroupMetadata]
+
+  /* lock protecting access to loading and owned partition sets */
+  private val partitionLock = new ReentrantLock()
+
+  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
+  private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+
+  /* partitions of consumer groups that are assigned, using the same loading partition lock */
+  private val ownedPartitions: mutable.Set[Int] = mutable.Set()
+
+  /* shutting down flag */
+  private val shuttingDown = new AtomicBoolean(false)
+
+  /* number of partitions for the consumer metadata topic */
+  private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
+
+  /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
+  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
+
+  this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
+
+  newGauge("NumOffsets",
+    new Gauge[Int] {
+      def value = groupMetadataCache.values.map(group => {
+        group synchronized { group.numOffsets }
+      }).sum
+    }
+  )
+
+  newGauge("NumGroups",
+    new Gauge[Int] {
+      def value = groupMetadataCache.size
+    }
+  )
+
+  def enableMetadataExpiration() {
+    scheduler.startup()
+
+    scheduler.schedule(name = "delete-expired-group-metadata",
+      fun = cleanupGroupMetadata,
+      period = config.offsetsRetentionCheckIntervalMs,
+      unit = TimeUnit.MILLISECONDS)
+  }
+
+  def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
+
+  def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) }
+
+  def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) }
+
+  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
+
+  def isGroupLocal(groupId: String): Boolean = isPartitionOwned(partitionFor(groupId))
+
+  def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId))
+
+  def isLoading(): Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
+
+  /**
+   * Get the group associated with the given groupId, or null if not found
+   */
+  def getGroup(groupId: String): Option[GroupMetadata] = {
+    Option(groupMetadataCache.get(groupId))
+  }
+
+  /**
+   * Add a group or get the group associated with the given groupId if it already exists
+   */
+  def addGroup(group: GroupMetadata): GroupMetadata = {
+    val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
+    if (currentGroup != null) {
+      currentGroup
+    } else {
+      group
+    }
+  }
+
+  def prepareStoreGroup(group: GroupMetadata,
+                        groupAssignment: Map[String, Array[Byte]],
+                        responseCallback: Errors => Unit): Option[DelayedStore] = {
+    getMagic(partitionFor(group.groupId)) match {
+      case Some(magicValue) =>
+        val groupMetadataValueVersion = {
+          if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
+            0.toShort
+          else
+            GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+        }
+
+        // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+        val timestampType = TimestampType.CREATE_TIME
+        val timestamp = time.milliseconds()
+        val key = GroupMetadataManager.groupMetadataKey(group.groupId)
+        val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)
+
+        val records = {
+          val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
+            Seq(new SimpleRecord(timestamp, key, value)).asJava))
+          val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+          builder.append(timestamp, key, value)
+          builder.build()
+        }
+
+        val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+        val groupMetadataRecords = Map(groupMetadataPartition -> records)
+        val generationId = group.generationId
+
+        // set the callback function to insert the created group into cache after log append completed
+        def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+          // the append response should only contain the topics partition
+          if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
+            throw new IllegalStateException("Append status %s should only have one partition %s"
+              .format(responseStatus, groupMetadataPartition))
+
+          // construct the error status in the propagated assignment response
+          // in the cache
+          val status = responseStatus(groupMetadataPartition)
+
+          val responseError = if (status.error == Errors.NONE) {
+            Errors.NONE
+          } else {
+            debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
+              s"due to ${status.error.exceptionName}")
+
+            // transform the log append error code to the corresponding the commit status error code
+            status.error match {
+              case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                   | Errors.NOT_ENOUGH_REPLICAS
+                   | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+                Errors.COORDINATOR_NOT_AVAILABLE
+
+              case Errors.NOT_LEADER_FOR_PARTITION =>
+                Errors.NOT_COORDINATOR
+
+              case Errors.REQUEST_TIMED_OUT =>
+                Errors.REBALANCE_IN_PROGRESS
+
+              case Errors.MESSAGE_TOO_LARGE
+                   | Errors.RECORD_LIST_TOO_LARGE
+                   | Errors.INVALID_FETCH_SIZE =>
+
+                error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
+                  s"${status.error.exceptionName}, returning UNKNOWN error code to the client")
+
+                Errors.UNKNOWN
+
+              case other =>
+                error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
+                  s"due to unexpected error: ${status.error.exceptionName}")
+
+                other
+            }
+          }
+
+          responseCallback(responseError)
+        }
+        Some(DelayedStore(groupMetadataRecords, putCacheCallback))
+
+      case None =>
+        responseCallback(Errors.NOT_COORDINATOR)
+        None
+    }
+  }
+
+  def store(delayedStore: DelayedStore) {
+    // call replica manager to append the group message
+    replicaManager.appendRecords(
+      config.offsetCommitTimeoutMs.toLong,
+      config.offsetCommitRequiredAcks,
+      true, // allow appending to internal offset topic
+      delayedStore.partitionRecords,
+      delayedStore.callback)
+  }
+
+  /**
+   * Store offsets by appending it to the replicated log and then inserting to cache
+   */
+  def prepareStoreOffsets(group: GroupMetadata,
+                          consumerId: String,
+                          generationId: Int,
+                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Option[DelayedStore] = {
+    // first filter out partitions with offset metadata size exceeding limit
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
+
+    // construct the message set to append
+    if (filteredOffsetMetadata.isEmpty) {
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.mapValues(_ => Errors.OFFSET_METADATA_TOO_LARGE)
+      responseCallback(commitStatus)
+      None
+    } else {
+      getMagic(partitionFor(group.groupId)) match {
+        case Some(magicValue) =>
+          // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+          val timestampType = TimestampType.CREATE_TIME
+          val timestamp = time.milliseconds()
+
+          val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+            val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
+            val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+            new SimpleRecord(timestamp, key, value)
+          }
+          val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
+          val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava))
+          val builder = MemoryRecords.builder(buffer, magicValue, compressionType, timestampType, 0L)
+          records.foreach(builder.append)
+          val entries = Map(offsetTopicPartition -> builder.build())
+
+          // set the callback function to insert offsets into cache after log append completed
+          def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
+            // the append response should only contain the topics partition
+            if (responseStatus.size != 1 || !responseStatus.contains(offsetTopicPartition))
+              throw new IllegalStateException("Append status %s should only have one partition %s"
+                .format(responseStatus, offsetTopicPartition))
+
+            // construct the commit response status and insert
+            // the offset and metadata to cache if the append status has no error
+            val status = responseStatus(offsetTopicPartition)
+
+            val responseError = group synchronized {
+              if (status.error == Errors.NONE) {
+                if (!group.is(Dead)) {
+                  filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+                    group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
+                  }
+                }
+                Errors.NONE
+              } else {
+                if (!group.is(Dead)) {
+                  filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
+                    group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
+                  }
+                }
+
+                debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
+                  s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
+
+                // transform the log append error code to the corresponding the commit status error code
+                status.error match {
+                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                       | Errors.NOT_ENOUGH_REPLICAS
+                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+                    Errors.COORDINATOR_NOT_AVAILABLE
+
+                  case Errors.NOT_LEADER_FOR_PARTITION =>
+                    Errors.NOT_COORDINATOR
+
+                  case Errors.MESSAGE_TOO_LARGE
+                       | Errors.RECORD_LIST_TOO_LARGE
+                       | Errors.INVALID_FETCH_SIZE =>
+                    Errors.INVALID_COMMIT_OFFSET_SIZE
+
+                  case other => other
+                }
+              }
+            }
+
+            // compute the final error codes for the commit response
+            val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
+              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+                (topicPartition, responseError)
+              else
+                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+            }
+
+            // finally trigger the callback logic passed from the API layer
+            responseCallback(commitStatus)
+          }
+
+          group synchronized {
+            group.prepareOffsetCommit(offsetMetadata)
+          }
+
+          Some(DelayedStore(entries, putCacheCallback))
+
+        case None =>
+          val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
+            (topicPartition, Errors.NOT_COORDINATOR)
+          }
+          responseCallback(commitStatus)
+          None
+      }
+    }
+  }
+
+  /**
+   * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
+   * returns the current offset or it begins to sync the cache from the log (and returns an error code).
+   */
+  def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
+    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
+    val group = groupMetadataCache.get(groupId)
+    if (group == null) {
+      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
+      }.toMap
+    } else {
+      group synchronized {
+        if (group.is(Dead)) {
+          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+            (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
+          }.toMap
+        } else {
+          topicPartitionsOpt match {
+            case None =>
+              // Return offsets for all partitions owned by this consumer group. (this only applies to consumers
+              // that commit offsets to Kafka.)
+              group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
+                topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
+              }
+
+            case Some(topicPartitions) =>
+              topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
+                val partitionData = group.offset(topicPartition) match {
+                  case None =>
+                    new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
+                  case Some(offsetAndMetadata) =>
+                    new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
+                }
+                topicPartition -> partitionData
+              }.toMap
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Asynchronously read the partition from the offsets topic and populate the cache
+   */
+  def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
+    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+
+    def doLoadGroupsAndOffsets() {
+      info(s"Loading offsets and group metadata from $topicPartition")
+
+      inLock(partitionLock) {
+        if (loadingPartitions.contains(offsetsPartition)) {
+          info(s"Offset load from $topicPartition already in progress.")
+          return
+        } else {
+          loadingPartitions.add(offsetsPartition)
+        }
+      }
+
+      try {
+        loadGroupsAndOffsets(topicPartition, onGroupLoaded)
+      } catch {
+        case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
+      } finally {
+        inLock(partitionLock) {
+          ownedPartitions.add(offsetsPartition)
+          loadingPartitions.remove(offsetsPartition)
+        }
+      }
+    }
+
+    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+  }
+
+  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
+    def highWaterMark = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
+
+    val startMs = time.milliseconds()
+    replicaManager.getLog(topicPartition) match {
+      case None =>
+        warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
+
+      case Some(log) =>
+        var currOffset = log.logStartOffset
+        val buffer = ByteBuffer.allocate(config.loadBufferSize)
+        // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
+        val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
+        val removedOffsets = mutable.Set[GroupTopicPartition]()
+        val loadedGroups = mutable.Map[String, GroupMetadata]()
+        val removedGroups = mutable.Set[String]()
+
+        while (currOffset < highWaterMark && !shuttingDown.get()) {
+          buffer.clear()
+          val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
+            .records.asInstanceOf[FileRecords]
+          val bufferRead = fileRecords.readInto(buffer, 0)
+
+          MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+            for (record <- batch.asScala) {
+              require(record.hasKey, "Group metadata/offset entry key should not be null")
+              GroupMetadataManager.readMessageKey(record.key) match {
+
+                case offsetKey: OffsetKey =>
+                  // load offset
+                  val key = offsetKey.key
+                  if (!record.hasValue) {
+                    loadedOffsets.remove(key)
+                    removedOffsets.add(key)
+                  } else {
+                    val value = GroupMetadataManager.readOffsetMessageValue(record.value)
+                    loadedOffsets.put(key, value)
+                    removedOffsets.remove(key)
+                  }
+
+                case groupMetadataKey: GroupMetadataKey =>
+                  // load group metadata
+                  val groupId = groupMetadataKey.key
+                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+                  if (groupMetadata != null) {
+                    trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
+                    removedGroups.remove(groupId)
+                    loadedGroups.put(groupId, groupMetadata)
+                  } else {
+                    loadedGroups.remove(groupId)
+                    removedGroups.add(groupId)
+                  }
+
+                case unknownKey =>
+                  throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
+              }
+
+              currOffset = batch.nextOffset
+            }
+          }
+
+          val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+            .groupBy(_._1.group)
+            .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
+            .partition { case (group, _) => loadedGroups.contains(group) }
+
+          loadedGroups.values.foreach { group =>
+            val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
+            loadGroup(group, offsets)
+            onGroupLoaded(group)
+          }
+
+          // load groups which store offsets in kafka, but which have no active members and thus no group
+          // metadata stored in the log
+          emptyGroupOffsets.foreach { case (groupId, offsets) =>
+            val group = new GroupMetadata(groupId)
+            loadGroup(group, offsets)
+            onGroupLoaded(group)
+          }
+
+          removedGroups.foreach { groupId =>
+            // if the cache already contains a group which should be removed, raise an error. Note that it
+            // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
+            // offset storage (i.e. by "simple" consumers)
+            if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
+              throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
+                s"loading partition $topicPartition")
+          }
+
+          if (!shuttingDown.get())
+            info("Finished loading offsets from %s in %d milliseconds."
+              .format(topicPartition, time.milliseconds() - startMs))
+        }
+    }
+  }
+
+  private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {
+    // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
+    // view of the group's offsets
+    val loadedOffsets = offsets.mapValues { offsetAndMetadata =>
+      // special handling for version 0:
+      // set the expiration time stamp as commit time stamp + server default retention time
+      if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+        offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
+      else
+        offsetAndMetadata
+    }
+    trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
+    group.initializeOffsets(loadedOffsets)
+
+    val currentGroup = addGroup(group)
+    if (group != currentGroup)
+      debug(s"Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed " +
+        s"because there is already a cached group with generation ${currentGroup.generationId}")
+  }
+
+  /**
+   * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
+   * that partition.
+   *
+   * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+   */
+  def removeGroupsForPartition(offsetsPartition: Int,
+                               onGroupUnloaded: GroupMetadata => Unit) {
+    val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+    scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
+
+    def removeGroupsAndOffsets() {
+      var numOffsetsRemoved = 0
+      var numGroupsRemoved = 0
+
+      inLock(partitionLock) {
+        // we need to guard the group removal in cache in the loading partition lock
+        // to prevent coordinator's check-and-get-group race condition
+        ownedPartitions.remove(offsetsPartition)
+
+        for (group <- groupMetadataCache.values) {
+          if (partitionFor(group.groupId) == offsetsPartition) {
+            onGroupUnloaded(group)
+            groupMetadataCache.remove(group.groupId, group)
+            numGroupsRemoved += 1
+            numOffsetsRemoved += group.numOffsets
+          }
+        }
+      }
+
+      if (numOffsetsRemoved > 0)
+        info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
+
+      if (numGroupsRemoved > 0)
+        info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
+    }
+  }
+
+  // visible for testing
+  private[group] def cleanupGroupMetadata(): Unit = {
+    cleanupGroupMetadata(None)
+  }
+
+  def cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]) {
+    val startMs = time.milliseconds()
+    var offsetsRemoved = 0
+
+    groupMetadataCache.foreach { case (groupId, group) =>
+      val (removedOffsets, groupIsDead, generation) = group synchronized {
+        val removedOffsets = deletedTopicPartitions match {
+          case Some(topicPartitions) => group.removeOffsets(topicPartitions)
+          case None => group.removeExpiredOffsets(startMs)
+        }
+
+        if (group.is(Empty) && !group.hasOffsets) {
+          info(s"Group $groupId transitioned to Dead in generation ${group.generationId}")
+          group.transitionTo(Dead)
+        }
+        (removedOffsets, group.is(Dead), group.generationId)
+      }
+
+      val offsetsPartition = partitionFor(groupId)
+      val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
+      getMagic(offsetsPartition) match {
+        case Some(magicValue) =>
+          // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
+          val timestampType = TimestampType.CREATE_TIME
+          val timestamp = time.milliseconds()
+
+          val partitionOpt = replicaManager.getPartition(appendPartition)
+          partitionOpt.foreach { partition =>
+            val tombstones = ListBuffer.empty[SimpleRecord]
+            removedOffsets.foreach { case (topicPartition, offsetAndMetadata) =>
+              trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition: $offsetAndMetadata")
+              val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
+              tombstones += new SimpleRecord(timestamp, commitKey, null)
+            }
+            trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
+
+            // We avoid writing the tombstone when the generationId is 0, since this group is only using
+            // Kafka for offset storage.
+            if (groupIsDead && groupMetadataCache.remove(groupId, group) && generation > 0) {
+              // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+              // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+              // retry removing this group.
+              val groupMetadataKey = GroupMetadataManager.groupMetadataKey(group.groupId)
+              tombstones += new SimpleRecord(timestamp, groupMetadataKey, null)
+              trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.")
+            }
+
+            if (tombstones.nonEmpty) {
+              try {
+                // do not need to require acks since even if the tombstone is lost,
+                // it will be appended again in the next purge cycle
+                val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
+                partition.appendRecordsToLeader(records)
+                offsetsRemoved += removedOffsets.size
+                trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+                  s"offsets and/or metadata for group $groupId")
+              } catch {
+                case t: Throwable =>
+                  error(s"Failed to append ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
+                    s"offsets and/or metadata for group $groupId.", t)
+                // ignore and continue
+              }
+            }
+          }
+
+        case None =>
+          info(s"BrokerId $brokerId is no longer a coordinator for the group $groupId. Proceeding cleanup for other alive groups")
+      }
+    }
+
+    info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+  }
+
+  /*
+   * Check if the offset metadata length is valid
+   */
+  private def validateOffsetMetadataLength(metadata: String) : Boolean = {
+    metadata == null || metadata.length() <= config.maxMetadataSize
+  }
+
+
+  def shutdown() {
+    shuttingDown.set(true)
+    if (scheduler.isStarted)
+      scheduler.shutdown()
+
+    // TODO: clear the caches
+  }
+
+  /**
+   * Gets the partition count of the group metadata topic from ZooKeeper.
+   * If the topic does not exist, the configured partition count is returned.
+   */
+  private def getGroupMetadataTopicPartitionCount: Int = {
+    zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
+  }
+
+  /**
+   * Check if the replica is local and return the message format version and timestamp
+   *
+   * @param   partition  Partition of GroupMetadataTopic
+   * @return  Some(MessageFormatVersion) if replica is local, None otherwise
+   */
+  private def getMagic(partition: Int): Option[Byte] =
+    replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition))
+
+  /**
+   * Add the partition into the owned list
+   *
+   * NOTE: this is for test only
+   */
+  def addPartitionOwnership(partition: Int) {
+    inLock(partitionLock) {
+      ownedPartitions.add(partition)
+    }
+  }
+}
+
+/**
+ * Messages stored for the group topic has versions for both the key and value fields. Key
+ * version is used to indicate the type of the message (also to differentiate different types
+ * of messages from being compacted together if they have the same field values); and value
+ * version is used to evolve the messages within their data types:
+ *
+ * key version 0:       group consumption offset
+ *    -> value version 0:       [offset, metadata, timestamp]
+ *
+ * key version 1:       group consumption offset
+ *    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]
+ *
+ * key version 2:       group metadata
+ *     -> value version 0:       [protocol_type, generation, protocol, leader, members]
+ */
+object GroupMetadataManager {
+
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
+  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
+
+  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
+    new Field("topic", STRING),
+    new Field("partition", INT32))
+  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
+  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
+  private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
+  private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
+  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("commit_timestamp", INT64),
+    new Field("expire_timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
+  private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
+  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
+
+  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
+  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
+
+  private val MEMBER_ID_KEY = "member_id"
+  private val CLIENT_ID_KEY = "client_id"
+  private val CLIENT_HOST_KEY = "client_host"
+  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
+  private val SESSION_TIMEOUT_KEY = "session_timeout"
+  private val SUBSCRIPTION_KEY = "subscription"
+  private val ASSIGNMENT_KEY = "assignment"
+
+  private val MEMBER_METADATA_V0 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val MEMBER_METADATA_V1 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(REBALANCE_TIMEOUT_KEY, INT32),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val PROTOCOL_TYPE_KEY = "protocol_type"
+  private val GENERATION_KEY = "generation"
+  private val PROTOCOL_KEY = "protocol"
+  private val LEADER_KEY = "leader"
+  private val MEMBERS_KEY = "members"
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+
+
+  // map of versions to key schemas as data types
+  private val MESSAGE_TYPE_SCHEMAS = Map(
+    0 -> OFFSET_COMMIT_KEY_SCHEMA,
+    1 -> OFFSET_COMMIT_KEY_SCHEMA,
+    2 -> GROUP_METADATA_KEY_SCHEMA)
+
+  // map of version of offset value schemas
+  private val OFFSET_VALUE_SCHEMAS = Map(
+    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
+    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
+  private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
+
+  // map of version of group metadata value schemas
+  private val GROUP_VALUE_SCHEMAS = Map(
+    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
+  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
+
+  private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+
+  private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+  private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
+
+  private def schemaForKey(version: Int) = {
+    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  private def schemaForOffset(version: Int) = {
+    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  private def schemaForGroup(version: Int) = {
+    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown group metadata version " + version)
+    }
+  }
+
+  /**
+   * Generates the key for offset commit message for given (group, topic, partition)
+   *
+   * @return key for offset commit message
+   */
+  private[group] def offsetCommitKey(group: String, topicPartition: TopicPartition,
+                                           versionId: Short = 0): Array[Byte] = {
+    val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
+    key.set(OFFSET_KEY_GROUP_FIELD, group)
+    key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
+    key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    key.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the key for group metadata message for given group
+   *
+   * @return key bytes for group metadata message
+   */
+  private[group] def groupMetadataKey(group: String): Array[Byte] = {
+    val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
+    key.set(GROUP_KEY_GROUP_FIELD, group)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+    byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+    key.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the payload for offset commit message from given offset and metadata
+   *
+   * @param offsetAndMetadata consumer's current offset and metadata
+   * @return payload for offset commit message
+   */
+  private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+    // generate commit value with schema version 1
+    val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
+    value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+    value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+    value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+    value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+    value.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the payload for group metadata message from given offset and metadata
+   * assuming the generation id, selected protocol, leader and member assignment are all available
+   *
+   * @param groupMetadata current group metadata
+   * @param assignment the assignment for the rebalancing generation
+   * @param version the version of the value message to use
+   * @return payload for offset commit message
+   */
+  private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
+                                        assignment: Map[String, Array[Byte]],
+                                        version: Short = 0): Array[Byte] = {
+    val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+
+    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
+    value.set(GENERATION_KEY, groupMetadata.generationId)
+    value.set(PROTOCOL_KEY, groupMetadata.protocol)
+    value.set(LEADER_KEY, groupMetadata.leaderId)
+
+    val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
+      val memberStruct = value.instance(MEMBERS_KEY)
+      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
+      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
+      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
+      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+
+      if (version > 0)
+        memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs)
+
+      val metadata = memberMetadata.metadata(groupMetadata.protocol)
+      memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
+
+      val memberAssignment = assignment(memberMetadata.memberId)
+      assert(memberAssignment != null)
+
+      memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
+
+      memberStruct
+    }
+
+    value.set(MEMBERS_KEY, memberArray.toArray)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+    byteBuffer.putShort(version)
+    value.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Decodes the offset messages' key
+   *
+   * @param buffer input byte-buffer
+   * @return an GroupTopicPartition object
+   */
+  def readMessageKey(buffer: ByteBuffer): BaseKey = {
+    val version = buffer.getShort
+    val keySchema = schemaForKey(version)
+    val key = keySchema.read(buffer)
+
+    if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
+      // version 0 and 1 refer to offset
+      val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
+      val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
+      val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
+
+      OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
+
+    } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
+      // version 2 refers to offset
+      val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
+
+      GroupMetadataKey(version, group)
+    } else {
+      throw new IllegalStateException("Unknown version " + version + " for group metadata message")
+    }
+  }
+
+  /**
+   * Decodes the offset messages' payload and retrieves offset and metadata from it
+   *
+   * @param buffer input byte-buffer
+   * @return an offset-metadata object from the message
+   */
+  def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+    if (buffer == null) { // tombstone
+      null
+    } else {
+      val version = buffer.getShort
+      val valueSchema = schemaForOffset(version)
+      val value = valueSchema.read(buffer)
+
+      if (version == 0) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
+        val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, timestamp)
+      } else if (version == 1) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+        val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+      } else {
+        throw new IllegalStateException("Unknown offset message version")
+      }
+    }
+  }
+
+  /**
+   * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
+   *
+   * @param buffer input byte-buffer
+   * @return a group metadata object from the message
+   */
+  def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
+    if (buffer == null) { // tombstone
+      null
+    } else {
+      val version = buffer.getShort
+      val valueSchema = schemaForGroup(version)
+      val value = valueSchema.read(buffer)
+
+      if (version == 0 || version == 1) {
+        val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
+
+        val memberMetadataArray = value.getArray(MEMBERS_KEY)
+        val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+
+        val group = new GroupMetadata(groupId, initialState)
+
+        group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
+        group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
+        group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
+
+        memberMetadataArray.foreach { memberMetadataObj =>
+          val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
+          val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+          val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+          val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+          val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+          val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+
+          val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
+
+          val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
+            protocolType, List((group.protocol, subscription)))
+
+          member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
+
+          group.add(member)
+        }
+
+        group
+      } else {
+        throw new IllegalStateException("Unknown group metadata message version")
+      }
+    }
+  }
+
+  // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
+  // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
+  class OffsetsMessageFormatter extends MessageFormatter {
+    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+      Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+        // Only print if the message is an offset record.
+        // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+        case offsetKey: OffsetKey =>
+          val groupTopicPartition = offsetKey.key
+          val value = consumerRecord.value
+          val formattedValue =
+            if (value == null) "NULL"
+            else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+          output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+        case _ => // no-op
+      }
+    }
+  }
+
+  // Formatter for use with tools to read group metadata history
+  class GroupMetadataMessageFormatter extends MessageFormatter {
+    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
+      Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
+        // Only print if the message is a group metadata record.
+        // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
+        case groupMetadataKey: GroupMetadataKey =>
+          val groupId = groupMetadataKey.key
+          val value = consumerRecord.value
+          val formattedValue =
+            if (value == null) "NULL"
+            else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+          output.write(groupId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+        case _ => // no-op
+      }
+    }
+  }
+
+}
+
+case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords],
+                        callback: Map[TopicPartition, PartitionResponse] => Unit)
+
+case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
+
+  def this(group: String, topic: String, partition: Int) =
+    this(group, new TopicPartition(topic, partition))
+
+  override def toString: String =
+    "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
+}
+
+trait BaseKey{
+  def version: Short
+  def key: Any
+}
+
+case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey {
+
+  override def toString: String = key.toString
+}
+
+case class GroupMetadataKey(version: Short, key: String) extends BaseKey {
+
+  override def toString: String = key
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
new file mode 100644
index 0000000..ce542b1
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/MemberMetadata.scala
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import java.util
+
+import kafka.utils.nonthreadsafe
+import org.apache.kafka.common.protocol.Errors
+
+
+case class MemberSummary(memberId: String,
+                         clientId: String,
+                         clientHost: String,
+                         metadata: Array[Byte],
+                         assignment: Array[Byte])
+
+/**
+ * Member metadata contains the following metadata:
+ *
+ * Heartbeat metadata:
+ * 1. negotiated heartbeat session timeout
+ * 2. timestamp of the latest heartbeat
+ *
+ * Protocol metadata:
+ * 1. the list of supported protocols (ordered by preference)
+ * 2. the metadata associated with each protocol
+ *
+ * In addition, it also contains the following state information:
+ *
+ * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
+ *                                 its rebalance callback will be kept in the metadata if the
+ *                                 member has sent the join group request
+ * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
+ *                            is kept in metadata until the leader provides the group assignment
+ *                            and the group transitions to stable
+ */
+@nonthreadsafe
+private[group] class MemberMetadata(val memberId: String,
+                                    val groupId: String,
+                                    val clientId: String,
+                                    val clientHost: String,
+                                    val rebalanceTimeoutMs: Int,
+                                    val sessionTimeoutMs: Int,
+                                    val protocolType: String,
+                                    var supportedProtocols: List[(String, Array[Byte])]) {
+
+  var assignment: Array[Byte] = Array.empty[Byte]
+  var awaitingJoinCallback: JoinGroupResult => Unit = null
+  var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null
+  var latestHeartbeat: Long = -1
+  var isLeaving: Boolean = false
+
+  def protocols = supportedProtocols.map(_._1).toSet
+
+  /**
+   * Get metadata corresponding to the provided protocol.
+   */
+  def metadata(protocol: String): Array[Byte] = {
+    supportedProtocols.find(_._1 == protocol) match {
+      case Some((_, metadata)) => metadata
+      case None =>
+        throw new IllegalArgumentException("Member does not support protocol")
+    }
+  }
+
+  /**
+   * Check if the provided protocol metadata matches the currently stored metadata.
+   */
+  def matches(protocols: List[(String, Array[Byte])]): Boolean = {
+    if (protocols.size != this.supportedProtocols.size)
+      return false
+
+    for (i <- 0 until protocols.size) {
+      val p1 = protocols(i)
+      val p2 = supportedProtocols(i)
+      if (p1._1 != p2._1 || !util.Arrays.equals(p1._2, p2._2))
+        return false
+    }
+    true
+  }
+
+  def summary(protocol: String): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment)
+  }
+
+  def summaryNoMetadata(): MemberSummary = {
+    MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte])
+  }
+
+  /**
+   * Vote for one of the potential group protocols. This takes into account the protocol preference as
+   * indicated by the order of supported protocols and returns the first one also contained in the set
+   */
+  def vote(candidates: Set[String]): String = {
+    supportedProtocols.find({ case (protocol, _) => candidates.contains(protocol)}) match {
+      case Some((protocol, _)) => protocol
+      case None =>
+        throw new IllegalArgumentException("Member does not support any of the candidate protocols")
+    }
+  }
+
+  override def toString = {
+    "[%s,%s,%s,%d]".format(memberId, clientId, clientHost, sessionTimeoutMs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
new file mode 100644
index 0000000..2b9a60f
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/OffsetConfig.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.message.{CompressionCodec, NoCompressionCodec}
+
+/**
+ * Configuration settings for in-built offset management
+ * @param maxMetadataSize The maximum allowed metadata for any offset commit.
+ * @param loadBufferSize Batch size for reading from the offsets segments when loading offsets into the cache.
+ * @param offsetsRetentionMs Offsets older than this retention period will be discarded.
+ * @param offsetsRetentionCheckIntervalMs Frequency at which to check for expired offsets.
+ * @param offsetsTopicNumPartitions The number of partitions for the offset commit topic (should not change after deployment).
+ * @param offsetsTopicSegmentBytes The offsets topic segment bytes should be kept relatively small to facilitate faster
+ *                                 log compaction and faster offset loads
+ * @param offsetsTopicReplicationFactor The replication factor for the offset commit topic (set higher to ensure availability).
+ * @param offsetsTopicCompressionCodec Compression codec for the offsets topic - compression should be turned on in
+ *                                     order to achieve "atomic" commits.
+ * @param offsetCommitTimeoutMs The offset commit will be delayed until all replicas for the offsets topic receive the
+ *                              commit or this timeout is reached. (Similar to the producer request timeout.)
+ * @param offsetCommitRequiredAcks The required acks before the commit can be accepted. In general, the default (-1)
+ *                                 should not be overridden.
+ */
+case class OffsetConfig(maxMetadataSize: Int = OffsetConfig.DefaultMaxMetadataSize,
+                        loadBufferSize: Int = OffsetConfig.DefaultLoadBufferSize,
+                        offsetsRetentionMs: Long = OffsetConfig.DefaultOffsetRetentionMs,
+                        offsetsRetentionCheckIntervalMs: Long = OffsetConfig.DefaultOffsetsRetentionCheckIntervalMs,
+                        offsetsTopicNumPartitions: Int = OffsetConfig.DefaultOffsetsTopicNumPartitions,
+                        offsetsTopicSegmentBytes: Int = OffsetConfig.DefaultOffsetsTopicSegmentBytes,
+                        offsetsTopicReplicationFactor: Short = OffsetConfig.DefaultOffsetsTopicReplicationFactor,
+                        offsetsTopicCompressionCodec: CompressionCodec = OffsetConfig.DefaultOffsetsTopicCompressionCodec,
+                        offsetCommitTimeoutMs: Int = OffsetConfig.DefaultOffsetCommitTimeoutMs,
+                        offsetCommitRequiredAcks: Short = OffsetConfig.DefaultOffsetCommitRequiredAcks)
+
+object OffsetConfig {
+  val DefaultMaxMetadataSize = 4096
+  val DefaultLoadBufferSize = 5*1024*1024
+  val DefaultOffsetRetentionMs = 24*60*60*1000L
+  val DefaultOffsetsRetentionCheckIntervalMs = 600000L
+  val DefaultOffsetsTopicNumPartitions = 50
+  val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
+  val DefaultOffsetsTopicReplicationFactor = 3.toShort
+  val DefaultOffsetsTopicCompressionCodec = NoCompressionCodec
+  val DefaultOffsetCommitTimeoutMs = 5000
+  val DefaultOffsetCommitRequiredAcks = (-1).toShort
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
new file mode 100644
index 0000000..9300e6a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import kafka.server.DelayedOperation
+import org.apache.kafka.common.protocol.Errors
+
+/**
+  * Delayed transaction state change operations that are added to the purgatory without timeout (i.e. these operations should never time out)
+  */
+private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
+                                           completionCallback: Errors => Unit)
+  extends DelayedOperation(Long.MaxValue) {
+
+  // overridden since tryComplete already synchronizes on the existing txn metadata. This makes it safe to
+  // call purgatory operations while holding the group lock.
+  override def safeTryComplete(): Boolean = tryComplete()
+
+  override def tryComplete(): Boolean = {
+    txnMetadata synchronized {
+      if (txnMetadata.topicPartitions.isEmpty)
+        forceComplete()
+      else false
+    }
+  }
+
+  override def onExpiration(): Unit = {
+    // this should never happen
+    throw new IllegalStateException(s"Delayed write txn marker operation for metadata $txnMetadata has timed out, this should never happen.")
+  }
+
+  override def onComplete(): Unit = completionCallback(Errors.NONE)
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
new file mode 100644
index 0000000..bb7f57b
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import kafka.common.KafkaException
+import kafka.utils.{Json, Logging, ZkUtils}
+
+/**
+ * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds (PIDs) in a unique way
+ * such that the same PID will not be assigned twice across multiple transaction coordinators.
+ *
+ * PIDs are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
+ * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+ */
+object ProducerIdManager extends Logging {
+  val CurrentVersion: Long = 1L
+  val PidBlockSize: Long = 1000L
+
+  def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+    Json.encode(Map("version" -> CurrentVersion,
+      "broker" -> pidBlock.brokerId,
+      "block_start" -> pidBlock.blockStartPid.toString,
+      "block_end" -> pidBlock.blockEndPid.toString)
+    )
+  }
+
+  def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+    try {
+      Json.parseFull(jsonData).flatMap { m =>
+        val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
+        val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
+        val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
+        val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
+        Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
+      }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+    } catch {
+      case e: java.lang.NumberFormatException =>
+        // this should never happen: the written data has exceeded long type limit
+        fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+        throw e
+    }
+  }
+}
+
+case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+  override def toString: String = {
+    val pidBlockInfo = new StringBuilder
+    pidBlockInfo.append("(brokerId:" + brokerId)
+    pidBlockInfo.append(",blockStartPID:" + blockStartPid)
+    pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
+    pidBlockInfo.toString()
+  }
+}
+
+class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private var currentPIDBlock: ProducerIdBlock = null
+  private var nextPID: Long = -1L
+
+  // grab the first block of PIDs
+  this synchronized {
+    getNewPidBlock()
+    nextPID = currentPIDBlock.blockStartPid
+  }
+
+  private def getNewPidBlock(): Unit = {
+    var zkWriteComplete = false
+    while (!zkWriteComplete) {
+      // refresh current pid block from zookeeper again
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+
+      // generate the new pid block
+      currentPIDBlock = dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+
+          if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+            // we have exhausted all pids (wow!), treat it as a fatal error
+            fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
+            throw new KafkaException("Have exhausted all pids.")
+          }
+
+          ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+        case None =>
+          debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+      }
+
+      val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+
+      // try to write the new pid block into zookeeper
+      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+      zkWriteComplete = succeeded
+
+      if (zkWriteComplete)
+        info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+    }
+  }
+
+  private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+    try {
+      val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          (currPIDBlock.equals(expectedPidBlock), zkVersion)
+        case None =>
+          (false, -1)
+      }
+    } catch {
+      case e: Exception =>
+        warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
+        
+        (false, -1)
+    }
+  }
+
+  def nextPid(): Long = {
+    this synchronized {
+      // grab a new block of PIDs if this block has been exhausted
+      if (nextPID > currentPIDBlock.blockEndPid) {
+        getNewPidBlock()
+        nextPID = currentPIDBlock.blockStartPid + 1
+      } else {
+        nextPID += 1
+      }
+
+      nextPID - 1
+    }
+  }
+
+  def shutdown() {
+    info(s"Shutdown complete: last PID assigned $nextPID")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
new file mode 100644
index 0000000..343f7ea
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.transaction
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager}
+import kafka.utils.{Logging, Scheduler, ZkUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests.TransactionResult
+import org.apache.kafka.common.utils.Time
+import kafka.utils.CoreUtils.inWriteLock
+
+
+object TransactionCoordinator {
+
+  def apply(config: KafkaConfig,
+            replicaManager: ReplicaManager,
+            scheduler: Scheduler,
+            zkUtils: ZkUtils,
+            metrics: Metrics,
+            metadataCache: MetadataCache,
+            time: Time): TransactionCoordinator = {
+
+    val txnConfig = TransactionConfig(config.transactionalIdExpirationMs,
+      config.transactionMaxTimeoutMs,
+      config.transactionTopicPartitions,
+      config.transactionTopicReplicationFactor,
+      config.transactionTopicSegmentBytes,
+      config.transactionsLoadBufferSize,
+      config.transactionTopicMinISR)
+
+    val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+    val logManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time)
+    val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId)
+    val transactionMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnMarkerPurgatory, time)
+
+    new TransactionCoordinator(config.brokerId, pidManager, logManager, transactionMarkerChannelManager, txnMarkerPurgatory, time)
+  }
+
+  private def initTransactionError(error: Errors): InitPidResult = {
+    InitPidResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, error)
+  }
+
+  private def initTransactionMetadata(txnMetadata: TransactionMetadata): InitPidResult = {
+    InitPidResult(txnMetadata.pid, txnMetadata.producerEpoch, Errors.NONE)
+  }
+}
+
+/**
+ * Transaction coordinator handles message transactions sent by producers and communicate with brokers
+ * to update ongoing transaction's status.
+ *
+ * Each Kafka server instantiates a transaction coordinator which is responsible for a set of
+ * producers. Producers with specific transactional ids are assigned to their corresponding coordinators;
+ * Producers with no specific transactional id may talk to a random broker as their coordinators.
+ */
+class TransactionCoordinator(brokerId: Int,
+                             pidManager: ProducerIdManager,
+                             txnManager: TransactionStateManager,
+                             txnMarkerChannelManager: TransactionMarkerChannelManager,
+                             txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker],
+                             time: Time) extends Logging {
+  this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+
+  import TransactionCoordinator._
+
+  type InitPidCallback = InitPidResult => Unit
+  type TxnMetadataUpdateCallback = Errors => Unit
+  type EndTxnCallback = Errors => Unit
+
+  /* Active flag of the coordinator */
+  private val isActive = new AtomicBoolean(false)
+
+  private val coordinatorLock = new ReentrantReadWriteLock
+
+  def handleInitPid(transactionalId: String,
+                    transactionTimeoutMs: Int,
+                    responseCallback: InitPidCallback): Unit = {
+      if (transactionalId == null || transactionalId.isEmpty) {
+        // if the transactional id is not specified, then always blindly accept the request
+        // and return a new pid from the pid manager
+        val pid = pidManager.nextPid()
+        responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+      } else if (!txnManager.isCoordinatorFor(transactionalId)) {
+        // check if it is the assigned coordinator for the transactional id
+        responseCallback(initTransactionError(Errors.NOT_COORDINATOR))
+      } else if (txnManager.isCoordinatorLoadingInProgress(transactionalId)) {
+        responseCallback(initTransactionError(Errors.COORDINATOR_LOAD_IN_PROGRESS))
+      } else if (!txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
+        // check transactionTimeoutMs is not larger than the broker configured maximum allowed value
+        responseCallback(initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT))
+      } else {
+        // only try to get a new pid and update the cache if the transactional id is unknown
+        txnManager.getTransactionState(transactionalId) match {
+          case None =>
+            val pid = pidManager.nextPid()
+            val newMetadata: TransactionMetadata = new TransactionMetadata(pid = pid,
+              producerEpoch = 0,
+              txnTimeoutMs = transactionTimeoutMs,
+              state = Empty,
+              topicPartitions = collection.mutable.Set.empty[TopicPartition],
+              lastUpdateTimestamp = time.milliseconds())
+
+            val metadata = txnManager.addTransaction(transactionalId, newMetadata)
+
+            // there might be a concurrent thread that has just updated the mapping
+            // with the transactional id at the same time; in this case we will
+            // treat it as the metadata has existed and update it accordingly
+            metadata synchronized {
+              if (!metadata.eq(newMetadata))
+                initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
+              else
+                appendMetadataToLog(transactionalId, metadata, responseCallback)
+
+            }
+          case Some(metadata) =>
+            initPidWithExistingMetadata(transactionalId, transactionTimeoutMs, responseCallback, metadata)
+        }
+      }
+  }
+
+  private def appendMetadataToLog(transactionalId: String,
+                             metadata: TransactionMetadata,
+                             initPidCallback: InitPidCallback): Unit ={
+    def callback(errors: Errors): Unit = {
+      if (errors == Errors.NONE)
+        initPidCallback(initTransactionMetadata(metadata))
+      else
+        initPidCallback(initTransactionError(errors))
+    }
+    appendToLogInReadLock(transactionalId, metadata, callback)
+  }
+
+
+  private def initPidWithExistingMetadata(transactionalId: String,
+                                          transactionTimeoutMs: Int,
+                                          responseCallback: InitPidCallback,
+                                          metadata: TransactionMetadata) = {
+
+    metadata synchronized {
+      if (metadata.state == Ongoing) {
+        // abort the ongoing transaction
+        handleEndTransaction(transactionalId,
+          metadata.pid,
+          metadata.producerEpoch,
+          TransactionResult.ABORT,
+          (errors: Errors) => {
+            if (errors != Errors.NONE) {
+              responseCallback(initTransactionError(errors))
+            } else {
+              // init pid again
+              handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+            }
+          })
+      } else if (metadata.state == PrepareAbort || metadata.state == PrepareCommit) {
+        // wait for the commit to complete and then init pid again
+        txnMarkerPurgatory.tryCompleteElseWatch(new DelayedTxnMarker(metadata, (errors: Errors) => {
+          if (errors != Errors.NONE)
+            responseCallback(initTransactionError(errors))
+          else
+            handleInitPid(transactionalId, transactionTimeoutMs, responseCallback)
+        }), Seq(metadata.pid))
+      } else {
+        metadata.producerEpoch = (metadata.producerEpoch + 1).toShort
+        metadata.txnTimeoutMs = transactionTimeoutMs
+        metadata.topicPartitions.clear()
+        metadata.lastUpdateTimestamp = time.milliseconds()
+        metadata.state = Empty
+        appendMetadataToLog(transactionalId, metadata, responseCallback)
+      }
+    }
+  }
+
+  private def validateTransactionalId(transactionalId: String): Errors =
+    if (transactionalId == null || transactionalId.isEmpty)
+      Errors.INVALID_REQUEST
+    else if (!txnManager.isCoordinatorFor(transactionalId))
+      Errors.NOT_COORDINATOR
+    else if (txnManager.isCoordinatorLoadingInProgress(transactionalId))
+      Errors.COORDINATOR_LOAD_IN_PROGRESS
+    else
+      Errors.NONE
+
+
+  def handleAddPartitionsToTransaction(transactionalId: String,
+                                       pid: Long,
+                                       epoch: Short,
+                                       partitions: collection.Set[TopicPartition],
+                                       responseCallback: TxnMetadataUpdateCallback): Unit = {
+    val errors = validateTransactionalId(transactionalId)
+    if (errors != Errors.NONE)
+      responseCallback(errors)
+    else {
+      // try to update the transaction metadata and append the updated metadata to txn log;
+      // if there is no such metadata treat it as invalid pid mapping error.
+      val (error, newMetadata) = txnManager.getTransactionState(transactionalId) match {
+        case None =>
+          (Errors.INVALID_PID_MAPPING, null)
+
+        case Some(metadata) =>
+          // generate the new transaction metadata with added partitions
+          metadata synchronized {
+            if (metadata.pid != pid) {
+              (Errors.INVALID_PID_MAPPING, null)
+            } else if (metadata.producerEpoch != epoch) {
+              (Errors.INVALID_PRODUCER_EPOCH, null)
+            } else if (metadata.pendingState.isDefined) {
+              // return a retriable exception to let the client backoff and retry
+              (Errors.CONCURRENT_TRANSACTIONS, null)
+            } else if (metadata.state != Empty && metadata.state != Ongoing) {
+              (Errors.INVALID_TXN_STATE, null)
+            } else if (partitions.subsetOf(metadata.topicPartitions)) {
+              // this is an optimization: if the partitions are already in the metadata reply OK immediately
+              (Errors.NONE, null)
+            } else {
+              val now = time.milliseconds()
+              val newMetadata = new TransactionMetadata(pid,
+                epoch,
+                metadata.txnTimeoutMs,
+                Ongoing,
+                metadata.topicPartitions ++ partitions,
+                if (metadata.state == Empty) now else metadata.transactionStartTime,
+                now)
+              metadata.prepareTransitionTo(Ongoing)
+              (Errors.NONE, newMetadata)
+            }
+          }
+      }
+
+      if (newMetadata != null) {
+        appendToLogInReadLock(transactionalId, newMetadata, responseCallback)
+      } else {
+        responseCallback(error)
+      }
+    }
+  }
+
+  def handleTxnImmigration(transactionStateTopicPartitionId: Int, coordinatorEpoch: Int) {
+    inWriteLock(coordinatorLock) {
+      txnManager.loadTransactionsForPartition(transactionStateTopicPartitionId, coordinatorEpoch)
+    }
+  }
+
+  def handleTxnEmigration(transactionStateTopicPartitionId: Int) {
+    inWriteLock(coordinatorLock) {
+      txnManager.removeTransactionsForPartition(transactionStateTopicPartitionId)
+      txnMarkerChannelManager.removeStateForPartition(transactionStateTopicPartitionId)
+    }
+  }
+
+  def handleEndTransaction(transactionalId: String,
+                           pid: Long,
+                           epoch: Short,
+                           command: TransactionResult,
+                           responseCallback: EndTxnCallback): Unit = {
+    val errors = validateTransactionalId(transactionalId)
+    if (errors != Errors.NONE)
+      responseCallback(errors)
+    else
+      txnManager.getTransactionState(transactionalId) match {
+        case None =>
+          responseCallback(Errors.INVALID_PID_MAPPING)
+        case Some(metadata) =>
+          metadata synchronized {
+            if (metadata.pid != pid)
+              responseCallback(Errors.INVALID_PID_MAPPING)
+            else if (metadata.producerEpoch != epoch)
+              responseCallback(Errors.INVALID_PRODUCER_EPOCH)
+            else metadata.state match {
+              case Ongoing =>
+                commitOrAbort(transactionalId, pid, epoch, command, responseCallback, metadata)
+              case CompleteCommit =>
+                if (command == TransactionResult.COMMIT)
+                  responseCallback(Errors.NONE)
+                else
+                  responseCallback(Errors.INVALID_TXN_STATE)
+              case CompleteAbort =>
+                if (command == TransactionResult.ABORT)
+                  responseCallback(Errors.NONE)
+                else
+                  responseCallback(Errors.INVALID_TXN_STATE)
+              case _ =>
+                responseCallback(Errors.INVALID_TXN_STATE)
+            }
+          }
+      }
+  }
+
+  private def appendToLogInReadLock(transactionalId: String,
+                                   metadata: TransactionMetadata,
+                                   callback: Errors =>Unit): Unit = {
+    def unlockCallback(errors:Errors): Unit = {
+      coordinatorLock.readLock().unlock()
+      callback(errors)
+    }
+    coordinatorLock.readLock().lock()
+    try {
+      txnManager.appendTransactionToLog(transactionalId,
+        metadata,
+        unlockCallback)
+    } catch {
+      case _:Throwable => coordinatorLock.readLock().unlock()
+    }
+
+  }
+  private def commitOrAbort(transactionalId: String,
+                            pid: Long,
+                            epoch: Short,
+                            command: TransactionResult,
+                            responseCallback: EndTxnCallback,
+                            metadata: TransactionMetadata) = {
+    val nextState = if (command == TransactionResult.COMMIT) PrepareCommit else PrepareAbort
+    val newMetadata = new TransactionMetadata(pid,
+      epoch,
+      metadata.txnTimeoutMs,
+      nextState,
+      metadata.topicPartitions,
+      metadata.transactionStartTime,
+      time.milliseconds())
+    metadata.prepareTransitionTo(nextState)
+
+    def logAppendCallback(errors: Errors): Unit = {
+      // we can respond to the client immediately and continue to write the txn markers if
+      // the log append was successful
+      responseCallback(errors)
+      if (errors == Errors.NONE)
+        txnManager.coordinatorEpochFor(transactionalId) match {
+          case Some(coordinatorEpoch) =>
+            def completionCallback(error: Errors): Unit = {
+              error match {
+                case Errors.NONE =>
+                  txnManager.getTransactionState(transactionalId) match {
+                    case Some(preparedCommitMetadata) =>
+                      val completedState = if (nextState == PrepareCommit) CompleteCommit else CompleteAbort
+                      val committedMetadata = new TransactionMetadata(pid,
+                        epoch,
+                        preparedCommitMetadata.txnTimeoutMs,
+                        completedState,
+                        preparedCommitMetadata.topicPartitions,
+                        preparedCommitMetadata.transactionStartTime,
+                        time.milliseconds())
+                      preparedCommitMetadata.prepareTransitionTo(completedState)
+
+                      def writeCommittedTransactionCallback(error: Errors): Unit =
+                        error match {
+                          case Errors.NONE =>
+                            txnMarkerChannelManager.removeCompleted(txnManager.partitionFor(transactionalId), pid)
+                          case Errors.NOT_COORDINATOR =>
+                            // this one should be completed by the new coordinator
+                            warn(s"no longer the coordinator for transactionalId: $transactionalId")
+                          case _ =>
+                            warn(s"error: $error caught for transactionalId: $transactionalId when appending state: $completedState. retrying")
+                            // retry until success
+                            appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+                        }
+
+                      appendToLogInReadLock(transactionalId, committedMetadata, writeCommittedTransactionCallback)
+                    case None =>
+                      // this one should be completed by the new coordinator
+                      warn(s"no longer the coordinator for transactionalId: $transactionalId")
+                  }
+                case Errors.NOT_COORDINATOR =>
+                  warn(s"no longer the coordinator for transactionalId: $transactionalId")
+                case _ =>
+                  warn(s"error: $error caught when writing transaction markers for transactionalId: $transactionalId. retrying")
+                  txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId),
+                    newMetadata,
+                    coordinatorEpoch,
+                    completionCallback)
+              }
+            }
+
+            txnMarkerChannelManager.addTxnMarkerRequest(txnManager.partitionFor(transactionalId), newMetadata, coordinatorEpoch, completionCallback)
+          case None =>
+            // this one should be completed by the new coordinator
+            warn(s"no longer the coordinator for transactionalId: $transactionalId")
+        }
+    }
+
+    txnManager.appendTransactionToLog(transactionalId, newMetadata, logAppendCallback)
+  }
+
+  def transactionTopicConfigs: Properties = txnManager.transactionTopicConfigs
+
+  def partitionFor(transactionalId: String): Int = txnManager.partitionFor(transactionalId)
+
+  /**
+   * Startup logic executed at the same time when the server starts up.
+   */
+  def startup(enablePidExpiration: Boolean = true) {
+    info("Starting up.")
+    if (enablePidExpiration)
+      txnManager.enablePidExpiration()
+    txnMarkerChannelManager.start()
+    isActive.set(true)
+
+    info("Startup complete.")
+  }
+
+  /**
+   * Shutdown logic executed at the same time when server shuts down.
+   * Ordering of actions should be reversed from the startup process.
+   */
+  def shutdown() {
+    info("Shutting down.")
+    isActive.set(false)
+    pidManager.shutdown()
+    txnManager.shutdown()
+    txnMarkerChannelManager.shutdown()
+    info("Shutdown complete.")
+  }
+}
+
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)
\ No newline at end of file