You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/14 03:09:12 UTC
kafka git commit: KAFKA-3470: treat commits as member heartbeats
Repository: kafka
Updated Branches:
refs/heads/trunk c1694833d -> f4d3d2865
KAFKA-3470: treat commits as member heartbeats
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
Closes #1206 from hachikuji/KAFKA-3470
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4d3d286
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4d3d286
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4d3d286
Branch: refs/heads/trunk
Commit: f4d3d2865894f1a1ade9d92ac27931fd35d16cae
Parents: c169483
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Apr 13 18:09:08 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 13 18:09:08 2016 -0700
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 22 +++-
.../scala/kafka/server/DelayedOperation.scala | 68 ++++++-----
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../main/scala/kafka/utils/timer/Timer.scala | 51 +++++++-
.../scala/kafka/utils/timer/TimerTask.scala | 2 +-
.../scala/kafka/utils/timer/TimerTaskList.scala | 7 +-
.../scala/kafka/utils/timer/TimingWheel.scala | 2 +-
.../other/kafka/TestPurgatoryPerformance.scala | 2 +-
.../GroupCoordinatorResponseTest.scala | 121 +++++++++++++++++--
.../kafka/server/DelayedOperationTest.scala | 2 +-
.../test/scala/unit/kafka/utils/MockTime.scala | 12 ++
.../unit/kafka/utils/timer/MockTimer.scala | 57 +++++++++
.../kafka/utils/timer/TimerTaskListTest.scala | 12 +-
.../unit/kafka/utils/timer/TimerTest.scala | 29 ++---
14 files changed, 312 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 30a3a78..fb71254 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -51,6 +51,8 @@ class GroupCoordinator(val brokerId: Int,
val groupConfig: GroupConfig,
val offsetConfig: OffsetConfig,
val groupManager: GroupMetadataManager,
+ val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
time: Time) extends Logging {
type JoinCallback = JoinGroupResult => Unit
type SyncCallback = (Array[Byte], Short) => Unit
@@ -59,9 +61,6 @@ class GroupCoordinator(val brokerId: Int,
private val isActive = new AtomicBoolean(false)
- private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
- private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
-
def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
@@ -80,8 +79,6 @@ class GroupCoordinator(val brokerId: Int,
*/
def startup() {
info("Starting up.")
- heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
- joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
isActive.set(true)
info("Startup complete.")
}
@@ -414,6 +411,8 @@ class GroupCoordinator(val brokerId: Int,
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else {
+ val member = group.get(memberId)
+ completeAndScheduleNextHeartbeatExpiration(group, member)
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,
offsetMetadata, responseCallback))
}
@@ -729,6 +728,17 @@ object GroupCoordinator {
zkUtils: ZkUtils,
replicaManager: ReplicaManager,
time: Time): GroupCoordinator = {
+ val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
+ val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
+ apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+ }
+
+ def apply(config: KafkaConfig,
+ zkUtils: ZkUtils,
+ replicaManager: ReplicaManager,
+ heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
+ joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+ time: Time): GroupCoordinator = {
val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
@@ -741,7 +751,7 @@ object GroupCoordinator {
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time)
- new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, time)
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, heartbeatPurgatory, joinPurgatory, time)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 0b53532..2205568 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -47,9 +47,7 @@ import com.yammer.metrics.core.Gauge
*
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*/
-abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
-
- override val expirationMs = delayMs + System.currentTimeMillis()
+abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
@@ -110,19 +108,27 @@ abstract class DelayedOperation(delayMs: Long) extends TimerTask with Logging {
}
}
+object DelayedOperationPurgatory {
+
+ def apply[T <: DelayedOperation](purgatoryName: String,
+ brokerId: Int = 0,
+ purgeInterval: Int = 1000): DelayedOperationPurgatory[T] = {
+ val timer = new SystemTimer(purgatoryName)
+ new DelayedOperationPurgatory[T](purgatoryName, timer, brokerId, purgeInterval)
+ }
+
+}
+
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
-class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
+class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
+ timeoutTimer: Timer,
+ brokerId: Int = 0,
+ purgeInterval: Int = 1000,
+ reaperEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
- // timeout timer
- private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
- def newThread(runnable: Runnable): Thread =
- Utils.newThread("executor-"+purgatoryName, runnable, false)
- })
- private[this] val timeoutTimer = new Timer(executor)
-
/* a list of operation watching keys */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
@@ -152,7 +158,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
metricsTags
)
- expirationReaper.start()
+ if (reaperEnabled)
+ expirationReaper.start()
/**
* Check if the operation can be completed, if not watch it based on the given watch keys
@@ -275,8 +282,9 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
* Shutdown the expire reaper thread
*/
def shutdown() {
- expirationReaper.shutdown()
- executor.shutdown()
+ if (reaperEnabled)
+ expirationReaper.shutdown()
+ timeoutTimer.shutdown()
}
/**
@@ -338,6 +346,23 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
}
}
+ def advanceClock(timeoutMs: Long) {
+ timeoutTimer.advanceClock(timeoutMs)
+
+ // Trigger a purge if the number of completed but still being watched operations is larger than
+ // the purge threshold. That number is computed by the difference btw the estimated total number of
+ // operations and the number of pending delayed operations.
+ if (estimatedTotalOperations.get - delayed > purgeInterval) {
+ // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
+ // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
+ // a little overestimated total number of operations.
+ estimatedTotalOperations.getAndSet(delayed)
+ debug("Begin purging watch lists")
+ val purged = allWatchers.map(_.purgeCompleted()).sum
+ debug("Purged %d elements from watch lists.".format(purged))
+ }
+ }
+
/**
* A background reaper to expire delayed operations that have timed out
*/
@@ -346,20 +371,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br
false) {
override def doWork() {
- timeoutTimer.advanceClock(200L)
-
- // Trigger a purge if the number of completed but still being watched operations is larger than
- // the purge threshold. That number is computed by the difference btw the estimated total number of
- // operations and the number of pending delayed operations.
- if (estimatedTotalOperations.get - delayed > purgeInterval) {
- // now set estimatedTotalOperations to delayed (the number of pending operations) since we are going to
- // clean up watchers. Note that, if more operations are completed during the clean up, we may end up with
- // a little overestimated total number of operations.
- estimatedTotalOperations.getAndSet(delayed)
- debug("Begin purging watch lists")
- val purged = allWatchers.map(_.purgeCompleted()).sum
- debug("Purged %d elements from watch lists.".format(purged))
- }
+ advanceClock(200L)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 22657f4..9bbd29e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -122,9 +122,9 @@ class ReplicaManager(val config: KafkaConfig,
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
- val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+ val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
- val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+ val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
val leaderCount = newGauge(
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/Timer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/Timer.scala b/core/src/main/scala/kafka/utils/timer/Timer.scala
index bdd0e75..2d78665 100644
--- a/core/src/main/scala/kafka/utils/timer/Timer.scala
+++ b/core/src/main/scala/kafka/utils/timer/Timer.scala
@@ -16,14 +16,52 @@
*/
package kafka.utils.timer
-import java.util.concurrent.{DelayQueue, ExecutorService, TimeUnit}
+import java.util.concurrent.{DelayQueue, Executors, ThreadFactory, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.threadsafe
+import org.apache.kafka.common.utils.Utils
+
+trait Timer {
+ /**
+ * Add a new task to this executor. It will be executed after the task's delay
+ * (beginning from the time of submission)
+ * @param timerTask the task to add
+ */
+ def add(timerTask: TimerTask): Unit
+
+ /**
+ * Advance the internal clock, executing any tasks whose expiration has been
+ * reached within the duration of the passed timeout.
+ * @param timeoutMs
+ * @return whether or not any tasks were executed
+ */
+ def advanceClock(timeoutMs: Long): Boolean
+
+ /**
+ * Get the number of tasks pending execution
+ * @return the number of tasks
+ */
+ def size: Int
+
+ /**
+ * Shutdown the timer service, leaving pending tasks unexecuted
+ */
+ def shutdown(): Unit
+}
@threadsafe
-class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20, startMs: Long = System.currentTimeMillis) {
+class SystemTimer(executorName: String,
+ tickMs: Long = 1,
+ wheelSize: Int = 20,
+ startMs: Long = System.currentTimeMillis) extends Timer {
+
+ // timeout timer
+ private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
+ def newThread(runnable: Runnable): Thread =
+ Utils.newThread("executor-"+executorName, runnable, false)
+ })
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
@@ -43,7 +81,7 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
- addTimerTaskEntry(new TimerTaskEntry(timerTask))
+ addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
} finally {
readLock.unlock()
}
@@ -82,6 +120,11 @@ class Timer(taskExecutor: ExecutorService, tickMs: Long = 1, wheelSize: Int = 20
}
}
- def size(): Int = taskCounter.get
+ def size: Int = taskCounter.get
+
+ override def shutdown() {
+ taskExecutor.shutdown()
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTask.scala b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
index d6b3a2e..6623854 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTask.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTask.scala
@@ -18,7 +18,7 @@ package kafka.utils.timer
trait TimerTask extends Runnable {
- val expirationMs: Long // timestamp in millisecond
+ val delayMs: Long // timestamp in millisecond
private[this] var timerTaskEntry: TimerTaskEntry = null
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
index c4aeb5d..e862f4f 100644
--- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
@@ -29,7 +29,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
// TimerTaskList forms a doubly linked cyclic list using a dummy root entry
// root.next points to the head
// root.prev points to the tail
- private[this] val root = new TimerTaskEntry(null)
+ private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
@@ -131,7 +131,7 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
}
-private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
+private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {
@volatile
var list: TimerTaskList = null
@@ -157,5 +157,8 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask) {
}
}
+ override def compare(that: TimerTaskEntry): Int = {
+ this.expirationMs compare that.expirationMs
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
index f5b6efe..4535f3f 100644
--- a/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
+++ b/core/src/main/scala/kafka/utils/timer/TimingWheel.scala
@@ -123,7 +123,7 @@ private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, ta
}
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
- val expiration = timerTaskEntry.timerTask.expirationMs
+ val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// Cancelled
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 744be3b..ba89fc8 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -95,7 +95,7 @@ object TestPurgatoryPerformance {
val latencySamples = new LatencySamples(1000000, pct75, pct50)
val intervalSamples = new IntervalSamples(1000000, requestRate)
- val purgatory = new DelayedOperationPurgatory[FakeOperation]("fake purgatory")
+ val purgatory = DelayedOperationPurgatory[FakeOperation]("fake purgatory")
val queue = new CompletionQueue()
val gcNames = gcMXBeans.map(_.getName)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index acdb660..beab1b5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -17,14 +17,14 @@
package kafka.coordinator
+import kafka.utils.timer.MockTimer
import org.apache.kafka.common.record.Record
import org.junit.Assert._
-import kafka.common.{OffsetAndMetadata, TopicAndPartition}
+import kafka.common.OffsetAndMetadata
import kafka.message.{Message, MessageSet}
-import kafka.server.{ReplicaManager, KafkaConfig}
+import kafka.server.{DelayedOperationPurgatory, ReplicaManager, KafkaConfig}
import kafka.utils._
-import org.apache.kafka.common.utils.SystemTime
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{utils, TopicPartition}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{OffsetCommitRequest, JoinGroupRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -56,6 +56,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
val ConsumerMinSessionTimeout = 10
val ConsumerMaxSessionTimeout = 1000
val DefaultSessionTimeout = 500
+ var timer: MockTimer = null
var groupCoordinator: GroupCoordinator = null
var replicaManager: ReplicaManager = null
var scheduler: KafkaScheduler = null
@@ -87,7 +88,14 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret)
EasyMock.replay(zkUtils)
- groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime)
+ timer = new MockTimer
+
+ val config = KafkaConfig.fromProps(props)
+
+ val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
+ val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
+
+ groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
groupCoordinator.startup()
// add the partition into the owned partition list
@@ -284,6 +292,90 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
}
@Test
+ def testSessionTimeout() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+ val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None)
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+ EasyMock.replay(replicaManager)
+
+ timer.advanceClock(DefaultSessionTimeout + 100)
+
+ EasyMock.reset(replicaManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code, heartbeatResult)
+ }
+
+ @Test
+ def testHeartbeatMaintainsSession() {
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val sessionTimeout = 1000
+
+ val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ timer.advanceClock(sessionTimeout / 2)
+
+ EasyMock.reset(replicaManager)
+ var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+
+ timer.advanceClock(sessionTimeout / 2 + 100)
+
+ EasyMock.reset(replicaManager)
+ heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+ }
+
+ @Test
+ def testCommitMaintainsSession() {
+ val sessionTimeout = 1000
+ val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+ val tp = new TopicPartition("topic", 0)
+ val offset = OffsetAndMetadata(0)
+
+ val joinGroupResult = joinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+ val assignedConsumerId = joinGroupResult.memberId
+ val generationId = joinGroupResult.generationId
+ val joinGroupErrorCode = joinGroupResult.errorCode
+ assertEquals(Errors.NONE.code, joinGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val (_, syncGroupErrorCode) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ timer.advanceClock(sessionTimeout / 2)
+
+ EasyMock.reset(replicaManager)
+ val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
+ assertEquals(Errors.NONE.code, commitOffsetResult(tp))
+
+ timer.advanceClock(sessionTimeout / 2 + 100)
+
+ EasyMock.reset(replicaManager)
+ val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+ assertEquals(Errors.NONE.code, heartbeatResult)
+ }
+
+ @Test
def testSyncGroupEmptyAssignment() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
@@ -459,7 +551,10 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
// with no leader SyncGroup, the follower's request should failure with an error indicating
// that it should rejoin
EasyMock.reset(replicaManager)
- val followerSyncFuture= sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+ val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+
+ timer.advanceClock(DefaultSessionTimeout + 100)
+
val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
assertEquals(Errors.REBALANCE_IN_PROGRESS.code, followerSyncResult._2)
}
@@ -628,17 +723,20 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
@Test
def testGenerationIdIncrementsOnRebalance() {
- val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-
- val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
+ val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, DefaultSessionTimeout, protocolType, protocols)
val initialGenerationId = joinGroupResult.generationId
val joinGroupErrorCode = joinGroupResult.errorCode
+ val memberId = joinGroupResult.memberId
assertEquals(1, initialGenerationId)
assertEquals(Errors.NONE.code, joinGroupErrorCode)
EasyMock.reset(replicaManager)
- val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, protocolType, protocols)
+ val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]()))
+ val syncGroupErrorCode = syncGroupResult._2
+ assertEquals(Errors.NONE.code, syncGroupErrorCode)
+
+ EasyMock.reset(replicaManager)
+ val otherJoinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
val nextGenerationId = otherJoinGroupResult.generationId
val otherJoinGroupErrorCode = otherJoinGroupResult.errorCode
assertEquals(2, nextGenerationId)
@@ -860,6 +958,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
protocolType: String,
protocols: List[(String, Array[Byte])]): JoinGroupResult = {
val responseFuture = sendJoinGroup(groupId, memberId, sessionTimeout, protocolType, protocols)
+ timer.advanceClock(10)
// should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
Await.result(responseFuture, Duration(sessionTimeout+100, TimeUnit.MILLISECONDS))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index df8d5b1..2c70137 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -26,7 +26,7 @@ class DelayedOperationTest {
@Before
def setUp() {
- purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
+ purgatory = DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
}
@After
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/MockTime.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockTime.scala b/core/src/test/scala/unit/kafka/utils/MockTime.scala
index ee65748..0858e04 100644
--- a/core/src/test/scala/unit/kafka/utils/MockTime.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockTime.scala
@@ -19,6 +19,8 @@ package kafka.utils
import java.util.concurrent._
+import org.apache.kafka.common.utils
+
/**
* A class used for unit testing things which depend on the Time interface.
*
@@ -47,3 +49,13 @@ class MockTime(@volatile private var currentMs: Long) extends Time {
override def toString() = "MockTime(%d)".format(milliseconds)
}
+
+object MockTime {
+ implicit def toCommonTime(time: MockTime): utils.Time = new utils.Time {
+ override def nanoseconds(): Long = time.nanoseconds
+
+ override def milliseconds(): Long = time.milliseconds
+
+ override def sleep(ms: Long): Unit = time.sleep(ms)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
new file mode 100644
index 0000000..d18a060
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/timer/MockTimer.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils.timer
+
+import kafka.utils.MockTime
+
+import scala.collection.mutable
+
+class MockTimer extends Timer {
+
+ val time = new MockTime
+ private val taskQueue = mutable.PriorityQueue[TimerTaskEntry]()
+
+ def add(timerTask: TimerTask) {
+ if (timerTask.delayMs <= 0)
+ timerTask.run()
+ else
+ taskQueue.enqueue(new TimerTaskEntry(timerTask, timerTask.delayMs + time.milliseconds))
+ }
+
+ def advanceClock(timeoutMs: Long): Boolean = {
+ time.sleep(timeoutMs)
+
+ var executed = false
+ val now = time.milliseconds
+
+ while (taskQueue.nonEmpty && now > taskQueue.head.expirationMs) {
+ val taskEntry = taskQueue.dequeue()
+ if (!taskEntry.cancelled) {
+ val task = taskEntry.timerTask
+ task.run()
+ executed = true
+ }
+ }
+
+ executed
+ }
+
+ def size: Int = taskQueue.size
+
+ override def shutdown(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
index a018dde..29c9067 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
@@ -18,11 +18,11 @@ package kafka.utils.timer
import org.junit.Assert._
import java.util.concurrent.atomic._
-import org.junit.{Test, After, Before}
+import org.junit.Test
class TimerTaskListTest {
- private class TestTask(val expirationMs: Long) extends TimerTask {
+ private class TestTask(val delayMs: Long) extends TimerTask {
def run(): Unit = { }
}
@@ -42,8 +42,8 @@ class TimerTaskListTest {
val list3 = new TimerTaskList(sharedCounter)
val tasks = (1 to 10).map { i =>
- val task = new TestTask(10L)
- list1.add(new TimerTaskEntry(task))
+ val task = new TestTask(0L)
+ list1.add(new TimerTaskEntry(task, 10L))
assertEquals(i, sharedCounter.get)
task
}.toSeq
@@ -54,7 +54,7 @@ class TimerTaskListTest {
tasks.take(4).foreach { task =>
val prevCount = sharedCounter.get
// new TimerTaskEntry(task) will remove the existing entry from the list
- list2.add(new TimerTaskEntry(task))
+ list2.add(new TimerTaskEntry(task, 10L))
assertEquals(prevCount, sharedCounter.get)
}
assertEquals(10 - 4, size(list1))
@@ -66,7 +66,7 @@ class TimerTaskListTest {
tasks.drop(4).foreach { task =>
val prevCount = sharedCounter.get
// new TimerTaskEntry(task) will remove the existing entry from the list
- list3.add(new TimerTaskEntry(task))
+ list3.add(new TimerTaskEntry(task, 10L))
assertEquals(prevCount, sharedCounter.get)
}
assertEquals(0, size(list1))
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4d3d286/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
index 95de378..54b73b8 100644
--- a/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala
@@ -27,7 +27,7 @@ import scala.util.Random
class TimerTest {
- private class TestTask(override val expirationMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask {
+ private class TestTask(override val delayMs: Long, id: Int, latch: CountDownLatch, output: ArrayBuffer[Int]) extends TimerTask {
private[this] val completed = new AtomicBoolean(false)
def run(): Unit = {
if (completed.compareAndSet(false, true)) {
@@ -37,32 +37,31 @@ class TimerTest {
}
}
- private[this] var executor: ExecutorService = null
+ private[this] var timer: Timer = null
@Before
def setup() {
- executor = Executors.newSingleThreadExecutor()
+ timer = new SystemTimer("test", tickMs = 1, wheelSize = 3)
}
@After
def teardown(): Unit = {
- executor.shutdown()
- executor = null
+ timer.shutdown()
}
@Test
def testAlreadyExpiredTask(): Unit = {
- val startTime = System.currentTimeMillis()
- val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime)
val output = new ArrayBuffer[Int]()
val latches = (-5 until 0).map { i =>
val latch = new CountDownLatch(1)
- timer.add(new TestTask(startTime + i, i, latch, output))
+ timer.add(new TestTask(i, i, latch, output))
latch
}
+ timer.advanceClock(0)
+
latches.take(5).foreach { latch =>
assertEquals("already expired tasks should run immediately", true, latch.await(3, TimeUnit.SECONDS))
}
@@ -72,8 +71,6 @@ class TimerTest {
@Test
def testTaskExpiration(): Unit = {
- val startTime = System.currentTimeMillis()
- val timer = new Timer(taskExecutor = executor, tickMs = 1, wheelSize = 3, startMs = startTime)
val output = new ArrayBuffer[Int]()
val tasks = new ArrayBuffer[TestTask]()
@@ -82,27 +79,27 @@ class TimerTest {
val latches =
(0 until 5).map { i =>
val latch = new CountDownLatch(1)
- tasks += new TestTask(startTime + i, i, latch, output)
+ tasks += new TestTask(i, i, latch, output)
ids += i
latch
} ++ (10 until 100).map { i =>
val latch = new CountDownLatch(2)
- tasks += new TestTask(startTime + i, i, latch, output)
- tasks += new TestTask(startTime + i, i, latch, output)
+ tasks += new TestTask(i, i, latch, output)
+ tasks += new TestTask(i, i, latch, output)
ids += i
ids += i
latch
} ++ (100 until 500).map { i =>
val latch = new CountDownLatch(1)
- tasks += new TestTask(startTime + i, i, latch, output)
+ tasks += new TestTask(i, i, latch, output)
ids += i
latch
}
// randomly submit requests
- Random.shuffle(tasks.toSeq).foreach { task => timer.add(task) }
+ tasks.foreach { task => timer.add(task) }
- while (timer.advanceClock(1000)) {}
+ while (timer.advanceClock(2000)) {}
latches.foreach { latch => latch.await() }