You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/14 03:09:12 UTC

kafka git commit: KAFKA-3470: treat commits as member heartbeats

Repository: kafka
Updated Branches:
  refs/heads/trunk c1694833d -> f4d3d2865


KAFKA-3470: treat commits as member heartbeats

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>

Closes #1206 from hachikuji/KAFKA-3470


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4d3d286
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4d3d286
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4d3d286

Branch: refs/heads/trunk
Commit: f4d3d2865894f1a1ade9d92ac27931fd35d16cae
Parents: c169483
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 13 18:09:08 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 13 18:09:08 2016 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    |  22 +++-
 .../scala/kafka/server/DelayedOperation.scala   |  68 ++++++-----
 .../scala/kafka/server/ReplicaManager.scala     |   4 +-
 .../main/scala/kafka/utils/timer/Timer.scala    |  51 +++++++-
 .../scala/kafka/utils/timer/TimerTask.scala     |   2 +-
 .../scala/kafka/utils/timer/TimerTaskList.scala |   7 +-
 .../scala/kafka/utils/timer/TimingWheel.scala   |   2 +-
 .../other/kafka/TestPurgatoryPerformance.scala  |   2 +-
 .../GroupCoordinatorResponseTest.scala          | 121 +++++++++++++++++--
 .../kafka/server/DelayedOperationTest.scala     |   2 +-
 .../test/scala/unit/kafka/utils/MockTime.scala  |  12 ++
 .../unit/kafka/utils/timer/MockTimer.scala      |  57 +++++++++
 .../kafka/utils/timer/TimerTaskListTest.scala   |  12 +-
 .../unit/kafka/utils/timer/TimerTest.scala      |  29 ++---
 14 files changed, 312 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 30a3a78..fb71254 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -51,6 +51,8 @@ class GroupCoordinator(val brokerId: Int,
                        val groupConfig: GroupConfig,
                        val offsetConfig: OffsetConfig,
                        val groupManager: GroupMetadataManager,
+                       val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+                       val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
                        time: Time) extends Logging {
   type JoinCallback = JoinGroupResult => Unit
   type SyncCallback = (Array[Byte], Short) => Unit
@@ -59,9 +61,6 @@ class GroupCoordinator(val brokerId: Int,
 
   private val isActive = new AtomicBoolean(false)
 
-  private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
-  private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
-
   def offsetsTopicConfigs: Properties = {
     val props = new Properties
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
@@ -80,8 +79,6 @@ class GroupCoordinator(val brokerId: Int,
    */
   def startup() {
     info("Starting up.")
-    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
-    joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -414,6 +411,8 @@ class GroupCoordinator(val brokerId: Int,
           } else if (generationId != group.generationId) {
             responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
           } else {
+            val member = group.get(memberId)
+            completeAndScheduleNextHeartbeatExpiration(group, member)
             delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
               offsetMetadata, responseCallback))
           }
@@ -729,6 +728,17 @@ object GroupCoordinator {
             zkUtils: ZkUtils,
             replicaManager: ReplicaManager,
             time: Time): GroupCoordinator = {
+    val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
+    val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
+    apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+  }
+
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            replicaManager: ReplicaManager,
+            heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+            joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+            time: Time): GroupCoordinator = {
     val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
@@ -741,7 +751,7 @@ object GroupCoordinator {
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
     val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, time)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, heartbeatPurgatory, joinPurgatory, time)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 0b53532..2205568 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -47,9 +47,7 @@ import com.yammer.metrics.core.Gauge
  *
  * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
  */
-abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
-
-  override val expirationMs = delayMs + System.currentTimeMillis()
+abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
 
@@ -110,19 +108,27 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
   }
 }
 
+object DelayedOperationPurgatory {
+
+  def apply[T <: DelayedOperation](purgatoryName: String,
+                                   brokerId: Int = 0,
+                                   purgeInterval: Int = 1000): DelayedOperationPurgatory[T] = {
+    val timer = new SystemTimer(purgatoryName)
+    new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
+  }
+
+}
+
 /**
  * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
  */
-class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
+class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
+                                                       timeoutTimer: Timer,
+                                                       brokerId: Int = 0,
+                                                       purgeInterval: Int = 1000,
+                                                       reaperEnabled: Boolean = true)
         extends Logging with KafkaMetricsGroup {
 
-  // timeout timer
-  private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
-    def newThread(runnable: Runnable): Thread =
-      Utils.newThread("executor-"+purgatoryName, runnable, false)
-  })
-  private[this] val timeoutTimer = new Timer(executor)
-
   /* a list of operation watching keys */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
 
@@ -152,7 +158,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
     metricsTags
   )
 
-  expirationReaper.start()
+  if (reaperEnabled)
+    expirationReaper.start()
 
   /**
    * Check if the operation can be completed, if not watch it based on the given watch keys
@@ -275,8 +282,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
    * Shutdown the expire reaper thread
    */
   def shutdown() {
-    expirationReaper.shutdown()
-    executor.shutdown()
+    if (reaperEnabled)
+      expirationReaper.shutdown()
+    timeoutTimer.shutdown()
   }
 
   /**
@@ -338,6 +346,23 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
     }
   }
 
+  def advanceClock(timeoutMs: Long) {
+    timeoutTimer.advanceClock(timeoutMs)
+
+    // Trigger a purge if the number of completed but still being watched operations is larger than
+    // the purge threshold. That number is computed by the difference btw the estimated total number of
+    // operations and the number of pending delayed operations.
+    if (estimatedTotalOperations.get - delayed > purgeInterval) {
+      // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
+      // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
+      // a little overestimated total number of operations.
+      estimatedTotalOperations.getAndSet(delayed)
+      debug("Begin purging watch lists")
+      val purged = allWatchers.map(_.purgeCompleted()).sum
+      debug("Purged %d elements from watch lists.".format(purged))
+    }
+  }
+
   /**
    * A background reaper to expire delayed operations that have timed out
    */
@@ -346,20 +371,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
     false) {
 
     override def doWork() {
-      timeoutTimer.advanceClock(200L)
-
-      // Trigger a purge if the number of completed but still being watched operations is larger than
-      // the purge threshold. That number is computed by the difference btw the estimated total number of
-      // operations and the number of pending delayed operations.
-      if (estimatedTotalOperations.get - delayed > purgeInterval) {
-        // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
-        // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
-        // a little overestimated total number of operations.
-        estimatedTotalOperations.getAndSet(delayed)
-        debug("Begin purging watch lists")
-        val purged = allWatchers.map(_.purgeCompleted()).sum
-        debug("Purged %d elements from watch lists.".format(purged))
-      }
+      advanceClock(200L)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 22657f4..9bbd29e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -122,9 +122,9 @@ class ReplicaManager(val config: KafkaConfig,
   private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
   private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
 
-  val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+  val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
     purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
-  val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+  val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
     purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
 
   val leaderCount = newGauge(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index bdd0e75..2d78665 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -16,14 +16,52 @@
  */
 package kafka.utils.timer
 
-import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit}
+import java.util.concurrent.{DelayQueue, Executors, ThreadFactory, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import kafka.utils.threadsafe
+import org.apache.kafka.common.utils.Utils
+
+trait Timer {
+  /**
+    * Add a new task to this executor. It will be executed after the task's delay
+    * (beginning from the time of submission)
+    * @param timerTask the task to add
+    */
+  def add(timerTask: TimerTask): Unit
+
+  /**
+    * Advance the internal clock, executing any tasks whose expiration has been
+    * reached within the duration of the passed timeout.
+    * @param timeoutMs
+    * @return whether or not any tasks were executed
+    */
+  def advanceClock(timeoutMs: Long): Boolean
+
+  /**
+    * Get the number of tasks pending execution
+    * @return the number of tasks
+    */
+  def size: Int
+
+  /**
+    * Shutdown the timer service, leaving pending tasks unexecuted
+    */
+  def shutdown(): Unit
+}
 
 @threadsafe
-class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) {
+class SystemTimer(executorName: String,
+                  tickMs: Long = 1,
+                  wheelSize: Int = 20,
+                  startMs: Long = System.currentTimeMillis) extends Timer {
+
+  // timeout timer
+  private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+    def newThread(runnable: Runnable): Thread =
+      Utils.newThread("executor-"+executorName, runnable, false)
+  })
 
   private[this] val delayQueue = new DelayQueue[TimerTaskList]()
   private[this] val taskCounter = new AtomicInteger(0)
@@ -43,7 +81,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
   def add(timerTask: TimerTask): Unit = {
     readLock.lock()
     try {
-      addTimerTaskEntry(new TimerTaskEntry(timerTask))
+      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
     } finally {
       readLock.unlock()
     }
@@ -82,6 +120,11 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
     }
   }
 
-  def size(): Int = taskCounter.get
+  def size: Int = taskCounter.get
+
+  override def shutdown() {
+    taskExecutor.shutdown()
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
index d6b3a2e..6623854 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -18,7 +18,7 @@ package kafka.utils.timer
 
 trait TimerTask extends Runnable {
 
-  val expirationMs: Long // timestamp in millisecond
+  val delayMs: Long // timestamp in millisecond
 
   private[this] var timerTaskEntry: TimerTaskEntry = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index c4aeb5d..e862f4f 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -29,7 +29,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
   // TimerTaskList forms a doubly linked cyclic list using a dummy root entry
   // root.next points to the head
   // root.prev points to the tail
-  private[this] val root = new TimerTaskEntry(null)
+  private[this] val root = new TimerTaskEntry(null, -1)
   root.next = root
   root.prev = root
 
@@ -131,7 +131,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
 
 }
 
-private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
+private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
 
   @volatile
   var list: TimerTaskList = null
@@ -157,5 +157,8 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
     }
   }
 
+  override def compare(that: TimerTaskEntry): Int = {
+    this.expirationMs compare that.expirationMs
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
index f5b6efe..4535f3f 100644
--- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -123,7 +123,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
   }
 
   def add(timerTaskEntry: TimerTaskEntry): Boolean = {
-    val expiration = timerTaskEntry.timerTask.expirationMs
+    val expiration = timerTaskEntry.expirationMs
 
     if (timerTaskEntry.cancelled) {
       // Cancelled

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 744be3b..ba89fc8 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -95,7 +95,7 @@ object TestPurgatoryPerformance {
     val latencySamples = new LatencySamples(1000000, pct75, pct50)
     val intervalSamples = new IntervalSamples(1000000, requestRate)
 
-    val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory")
+    val purgatory = DelayedOperationPurgatory[FakeOperation]("fake purgatory")
     val queue = new CompletionQueue()
 
     val gcNames = gcMXBeans.map(_.getName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index acdb660..beab1b5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,14 +17,14 @@
 
 package kafka.coordinator
 
+import kafka.utils.timer.MockTimer
 import org.apache.kafka.common.record.Record
 import org.junit.Assert._
-import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import kafka.common.OffsetAndMetadata
 import kafka.message.{Message, MessageSet}
-import kafka.server.{ReplicaManager, KafkaConfig}
+import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
 import kafka.utils._
-import org.apache.kafka.common.utils.SystemTime
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{utils, TopicPartition}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -56,6 +56,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 1000
   val DefaultSessionTimeout = 500
+  var timer: MockTimer = null
   var groupCoordinator: GroupCoordinator = null
   var replicaManager: ReplicaManager = null
   var scheduler: KafkaScheduler = null
@@ -87,7 +88,14 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
     EasyMock.replay(zkUtils)
 
-    groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
+    timer = new MockTimer
+
+    val config = KafkaConfig.fromProps(props)
+
+    val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
+    val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
+
+    groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
     groupCoordinator.startup()
 
     // add the partition into the owned partition list
@@ -284,6 +292,90 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   }
 
   @Test
+  def testSessionTimeout() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+    EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    timer.advanceClock(DefaultSessionTimeout + 100)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatMaintainsSession() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val sessionTimeout = 1000
+
+    val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    timer.advanceClock(sessionTimeout / 2)
+
+    EasyMock.reset(replicaManager)
+    var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+
+    timer.advanceClock(sessionTimeout / 2 + 100)
+
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
+  def testCommitMaintainsSession() {
+    val sessionTimeout = 1000
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupErrorCode = joinGroupResult.errorCode
+    assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    timer.advanceClock(sessionTimeout / 2)
+
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+
+    timer.advanceClock(sessionTimeout / 2 + 100)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE.code, heartbeatResult)
+  }
+
+  @Test
   def testSyncGroupEmptyAssignment() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
@@ -459,7 +551,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     // with no leader SyncGroup, the follower's request should failure with an error indicating
     // that it should rejoin
     EasyMock.reset(replicaManager)
-    val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+
+    timer.advanceClock(DefaultSessionTimeout + 100)
+
     val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
     assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
   }
@@ -628,17 +723,20 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testGenerationIdIncrementsOnRebalance() {
-    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-
-    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols)
     val initialGenerationId = joinGroupResult.generationId
     val joinGroupErrorCode = joinGroupResult.errorCode
+    val memberId = joinGroupResult.memberId
     assertEquals(1, initialGenerationId)
     assertEquals(Errors.NONE.code, joinGroupErrorCode)
 
     EasyMock.reset(replicaManager)
-    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols)
+    val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]()))
+    val syncGroupErrorCode = syncGroupResult._2
+    assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
     val nextGenerationId = otherJoinGroupResult.generationId
     val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
     assertEquals(2, nextGenerationId)
@@ -860,6 +958,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
                         protocolType: String,
                         protocols: List[(String, Array[Byte])]): JoinGroupResult = {
     val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+    timer.advanceClock(10)
     // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
     Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index df8d5b1..2c70137 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -26,7 +26,7 @@ class DelayedOperationTest {
 
   @Before
   def setUp() {
-    purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
+    purgatory = DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/MockTime.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala
index ee65748..0858e04 100644
--- a/core/src/test/scala/unit/kafka/utils/MockTime.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala
@@ -19,6 +19,8 @@ package kafka.utils
 
 import java.util.concurrent._
 
+import org.apache.kafka.common.utils
+
 /**
  * A class used for unit testing things which depend on the Time interface.
  * 
@@ -47,3 +49,13 @@ class MockTime(@volatile private var currentMs: Long) extends Time {
   override def toString() = "MockTime(%d)".format(milliseconds)
 
 }
+
+object MockTime {
+  implicit def toCommonTime(time: MockTime): utils.Time = new utils.Time {
+    override def nanoseconds(): Long = time.nanoseconds
+
+    override def milliseconds(): Long = time.milliseconds
+
+    override def sleep(ms: Long): Unit = time.sleep(ms)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
new file mode 100644
index 0000000..d18a060
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -0,0 +1,57 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.utils.timer
+
+import kafka.utils.MockTime
+
+import scala.collection.mutable
+
+class MockTimer extends Timer {
+
+  val time = new MockTime
+  private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()
+
+  def add(timerTask: TimerTask) {
+    if (timerTask.delayMs <= 0)
+      timerTask.run()
+    else
+      taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds))
+  }
+
+  def advanceClock(timeoutMs: Long): Boolean = {
+    time.sleep(timeoutMs)
+
+    var executed = false
+    val now = time.milliseconds
+
+    while (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) {
+      val taskEntry = taskQueue.dequeue()
+      if (!taskEntry.cancelled) {
+        val task = taskEntry.timerTask
+        task.run()
+        executed = true
+      }
+    }
+
+    executed
+  }
+
+  def size: Int = taskQueue.size
+
+  override def shutdown(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index a018dde..29c9067 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -18,11 +18,11 @@ package kafka.utils.timer
 
 import org.junit.Assert._
 import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+import org.junit.Test
 
 class TimerTaskListTest {
 
-  private class TestTask(val expirationMs: Long) extends TimerTask {
+  private class TestTask(val delayMs: Long) extends TimerTask {
     def run(): Unit = { }
   }
 
@@ -42,8 +42,8 @@ class TimerTaskListTest {
     val list3 = new TimerTaskList(sharedCounter)
 
     val tasks = (1 to 10).map { i =>
-      val task = new TestTask(10L)
-      list1.add(new TimerTaskEntry(task))
+      val task = new TestTask(0L)
+      list1.add(new TimerTaskEntry(task, 10L))
       assertEquals(i, sharedCounter.get)
       task
     }.toSeq
@@ -54,7 +54,7 @@ class TimerTaskListTest {
     tasks.take(4).foreach { task =>
       val prevCount = sharedCounter.get
       // new TimerTaskEntry(task) will remove the existing entry from the list
-      list2.add(new TimerTaskEntry(task))
+      list2.add(new TimerTaskEntry(task, 10L))
       assertEquals(prevCount, sharedCounter.get)
     }
     assertEquals(10 - 4, size(list1))
@@ -66,7 +66,7 @@ class TimerTaskListTest {
     tasks.drop(4).foreach { task =>
       val prevCount = sharedCounter.get
       // new TimerTaskEntry(task) will remove the existing entry from the list
-      list3.add(new TimerTaskEntry(task))
+      list3.add(new TimerTaskEntry(task, 10L))
       assertEquals(prevCount, sharedCounter.get)
     }
     assertEquals(0, size(list1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
index 95de378..54b73b8 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
@@ -27,7 +27,7 @@ import scala.util.Random
 
 class TimerTest {
 
-  private class TestTask(override val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask {
+  private class TestTask(override val delayMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask {
     private[this] val completed = new AtomicBoolean(false)
     def run(): Unit = {
       if (completed.compareAndSet(false, true)) {
@@ -37,32 +37,31 @@ class TimerTest {
     }
   }
 
-  private[this] var executor: ExecutorService = null
+  private[this] var timer: Timer = null
 
   @Before
   def setup() {
-    executor = Executors.newSingleThreadExecutor()
+    timer = new SystemTimer("test", tickMs = 1, wheelSize = 3)
   }
 
   @After
   def teardown(): Unit = {
-    executor.shutdown()
-    executor = null
+    timer.shutdown()
   }
 
   @Test
   def testAlreadyExpiredTask(): Unit = {
-    val startTime = System.currentTimeMillis()
-    val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime)
     val output = new ArrayBuffer[Int]()
 
 
     val latches = (-5 until 0).map { i =>
       val latch = new CountDownLatch(1)
-      timer.add(new TestTask(startTime + i, i, latch, output))
+      timer.add(new TestTask(i, i, latch, output))
       latch
     }
 
+    timer.advanceClock(0)
+
     latches.take(5).foreach { latch =>
       assertEquals("already expired tasks should run immediately", true, latch.await(3, TimeUnit.SECONDS))
     }
@@ -72,8 +71,6 @@ class TimerTest {
 
   @Test
   def testTaskExpiration(): Unit = {
-    val startTime = System.currentTimeMillis()
-    val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime)
     val output = new ArrayBuffer[Int]()
 
     val tasks = new ArrayBuffer[TestTask]()
@@ -82,27 +79,27 @@ class TimerTest {
     val latches =
       (0 until 5).map { i =>
         val latch = new CountDownLatch(1)
-        tasks += new TestTask(startTime + i, i, latch, output)
+        tasks += new TestTask(i, i, latch, output)
         ids += i
         latch
       } ++ (10 until 100).map { i =>
         val latch = new CountDownLatch(2)
-        tasks += new TestTask(startTime + i, i, latch, output)
-        tasks += new TestTask(startTime + i, i, latch, output)
+        tasks += new TestTask(i, i, latch, output)
+        tasks += new TestTask(i, i, latch, output)
         ids += i
         ids += i
         latch
       } ++ (100 until 500).map { i =>
         val latch = new CountDownLatch(1)
-        tasks += new TestTask(startTime + i, i, latch, output)
+        tasks += new TestTask(i, i, latch, output)
         ids += i
         latch
       }
 
     // randomly submit requests
-    Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) }
+    tasks.foreach { task => timer.add(task) }
 
-    while (timer.advanceClock(1000)) {}
+    while (timer.advanceClock(2000)) {}
 
     latches.foreach { latch => latch.await() }