You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/03/04 09:36:44 UTC
[kafka] branch trunk updated: KAFKA-9632;
Fix MockScheduler synchronization for safe use in Log/Partition
tests (#8209)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f35e649 KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209)
f35e649 is described below
commit f35e6496aa003dc0667dc338001a5b51be78fe76
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 4 09:36:06 2020 +0000
KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209)
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../scala/unit/kafka/utils/MockScheduler.scala | 53 ++++++++++++++--------
.../scala/unit/kafka/utils/SchedulerTest.scala | 42 +++++++++++++++--
2 files changed, 71 insertions(+), 24 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 75a089a..cfbbe0f 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Time
* Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
*/
class MockScheduler(val time: Time) extends Scheduler {
-
+
/* a priority queue of tasks ordered by next execution time */
private val tasks = new PriorityQueue[MockTask]()
@@ -44,10 +44,11 @@ class MockScheduler(val time: Time) extends Scheduler {
def startup(): Unit = {}
def shutdown(): Unit = {
- this synchronized {
- tasks.foreach(_.fun())
- tasks.clear()
- }
+ var currTask: Option[MockTask] = None
+ do {
+ currTask = poll(_ => true)
+ currTask.foreach(_.fun())
+ } while (currTask.nonEmpty)
}
/**
@@ -56,28 +57,26 @@ class MockScheduler(val time: Time) extends Scheduler {
* If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
*/
def tick(): Unit = {
- this synchronized {
- val now = time.milliseconds
- while(tasks.nonEmpty && tasks.head.nextExecution <= now) {
- /* pop and execute the task with the lowest next execution time */
- val curr = tasks.dequeue
+ val now = time.milliseconds
+ var currTask: Option[MockTask] = None
+ /* pop and execute the task with the lowest next execution time if ready */
+ do {
+ currTask = poll(_.nextExecution <= now)
+ currTask.foreach { curr =>
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
- this.tasks += curr
+ add(curr)
}
}
- }
+ } while (currTask.nonEmpty)
}
-
+
def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = {
- var task : MockTask = null
- this synchronized {
- task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
- tasks += task
- tick()
- }
+ val task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
+ add(task)
+ tick()
task
}
@@ -86,7 +85,21 @@ class MockScheduler(val time: Time) extends Scheduler {
tasks.clear()
}
}
-
+
+ private def poll(predicate: MockTask => Boolean): Option[MockTask] = {
+ this synchronized {
+ if (tasks.nonEmpty && predicate.apply(tasks.head))
+ Some(tasks.dequeue)
+ else
+ None
+ }
+ }
+
+ private def add(task: MockTask): Unit = {
+ this synchronized {
+ tasks += task
+ }
+ }
}
case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] {
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 201216c..42241de 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -16,13 +16,15 @@
*/
package kafka.utils
-import org.junit.Assert._
+import java.util.Properties
import java.util.concurrent.atomic._
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+
import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
-import org.junit.{After, Before, Test}
import kafka.utils.TestUtils.retry
-import java.util.Properties
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
class SchedulerTest {
@@ -129,4 +131,36 @@ class SchedulerTest {
assertTrue(!(scheduler.taskRunning(log.producerExpireCheck)))
}
-}
\ No newline at end of file
+ /**
+ * Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled
+ * when another is being executed. This is required to avoid deadlocks when:
+ * a) Thread1 executes a task which attempts to acquire LockA
+ * b) Thread2 holding LockA attempts to schedule a new task
+ */
+ @Test(timeout = 15000)
+ def testMockSchedulerLocking(): Unit = {
+ val initLatch = new CountDownLatch(1)
+ val completionLatch = new CountDownLatch(2)
+ val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1))
+ def scheduledTask(taskLatch: CountDownLatch): Unit = {
+ initLatch.countDown()
+ assertTrue("Timed out waiting for latch", taskLatch.await(30, TimeUnit.SECONDS))
+ completionLatch.countDown()
+ }
+ mockTime.scheduler.schedule("test1", () => scheduledTask(taskLatches.head), delay=1)
+ val tickExecutor = Executors.newSingleThreadScheduledExecutor()
+ try {
+ tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS)
+
+ // wait for first task to execute and then schedule the next task while the first one is running
+ assertTrue(initLatch.await(10, TimeUnit.SECONDS))
+ mockTime.scheduler.schedule("test2", () => scheduledTask(taskLatches(1)), delay = 1)
+
+ taskLatches.foreach(_.countDown())
+ assertTrue("Tasks did not complete", completionLatch.await(10, TimeUnit.SECONDS))
+
+ } finally {
+ tickExecutor.shutdownNow()
+ }
+ }
+}