You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/03/04 09:36:44 UTC

[kafka] branch trunk updated: KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f35e649  KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209)
f35e649 is described below

commit f35e6496aa003dc0667dc338001a5b51be78fe76
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Mar 4 09:36:06 2020 +0000

    KAFKA-9632; Fix MockScheduler synchronization for safe use in Log/Partition tests (#8209)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../scala/unit/kafka/utils/MockScheduler.scala     | 53 ++++++++++++++--------
 .../scala/unit/kafka/utils/SchedulerTest.scala     | 42 +++++++++++++++--
 2 files changed, 71 insertions(+), 24 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 75a089a..cfbbe0f 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.utils.Time
  * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
  */
 class MockScheduler(val time: Time) extends Scheduler {
-  
+
   /* a priority queue of tasks ordered by next execution time */
   private val tasks = new PriorityQueue[MockTask]()
   
@@ -44,10 +44,11 @@ class MockScheduler(val time: Time) extends Scheduler {
   def startup(): Unit = {}
   
   def shutdown(): Unit = {
-    this synchronized {
-      tasks.foreach(_.fun())
-      tasks.clear()
-    }
+    var currTask: Option[MockTask] = None
+    do {
+      currTask = poll(_ => true)
+      currTask.foreach(_.fun())
+    } while (currTask.nonEmpty)
   }
   
   /**
@@ -56,28 +57,26 @@ class MockScheduler(val time: Time) extends Scheduler {
    * If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
    */
   def tick(): Unit = {
-    this synchronized {
-      val now = time.milliseconds
-      while(tasks.nonEmpty && tasks.head.nextExecution <= now) {
-        /* pop and execute the task with the lowest next execution time */
-        val curr = tasks.dequeue
+    val now = time.milliseconds
+    var currTask: Option[MockTask] = None
+    /* pop and execute the task with the lowest next execution time if ready */
+    do {
+      currTask = poll(_.nextExecution <= now)
+      currTask.foreach { curr =>
         curr.fun()
         /* if the task is periodic, reschedule it and re-enqueue */
         if(curr.periodic) {
           curr.nextExecution += curr.period
-          this.tasks += curr
+          add(curr)
         }
       }
-    }
+    } while (currTask.nonEmpty)
   }
-  
+
   def schedule(name: String, fun: () => Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[Unit] = {
-    var task : MockTask = null
-    this synchronized {
-      task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
-      tasks += task
-      tick()
-    }
+    val task = MockTask(name, fun, time.milliseconds + delay, period = period, time=time)
+    add(task)
+    tick()
     task
   }
 
@@ -86,7 +85,21 @@ class MockScheduler(val time: Time) extends Scheduler {
       tasks.clear()
     }
   }
-  
+
+  private def poll(predicate: MockTask => Boolean): Option[MockTask] = {
+    this synchronized {
+      if (tasks.nonEmpty && predicate.apply(tasks.head))
+        Some(tasks.dequeue)
+      else
+        None
+    }
+  }
+
+  private def add(task: MockTask): Unit = {
+    this synchronized {
+      tasks += task
+    }
+  }
 }
 
 case class MockTask(name: String, fun: () => Unit, var nextExecution: Long, period: Long, time: Time) extends ScheduledFuture[Unit] {
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index 201216c..42241de 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -16,13 +16,15 @@
  */
 package kafka.utils
 
-import org.junit.Assert._
+import java.util.Properties
 import java.util.concurrent.atomic._
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+
 import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
-import org.junit.{After, Before, Test}
 import kafka.utils.TestUtils.retry
-import java.util.Properties
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
 
 class SchedulerTest {
 
@@ -129,4 +131,36 @@ class SchedulerTest {
     assertTrue(!(scheduler.taskRunning(log.producerExpireCheck)))
   }
 
-}
\ No newline at end of file
+  /**
+   * Verify that scheduler lock is not held when invoking task method, allowing new tasks to be scheduled
+   * when another is being executed. This is required to avoid deadlocks when:
+   *   a) Thread1 executes a task which attempts to acquire LockA
+   *   b) Thread2 holding LockA attempts to schedule a new task
+   */
+  @Test(timeout = 15000)
+  def testMockSchedulerLocking(): Unit = {
+    val initLatch = new CountDownLatch(1)
+    val completionLatch = new CountDownLatch(2)
+    val taskLatches = List(new CountDownLatch(1), new CountDownLatch(1))
+    def scheduledTask(taskLatch: CountDownLatch): Unit = {
+      initLatch.countDown()
+      assertTrue("Timed out waiting for latch", taskLatch.await(30, TimeUnit.SECONDS))
+      completionLatch.countDown()
+    }
+    mockTime.scheduler.schedule("test1", () => scheduledTask(taskLatches.head), delay=1)
+    val tickExecutor = Executors.newSingleThreadScheduledExecutor()
+    try {
+      tickExecutor.scheduleWithFixedDelay(() => mockTime.sleep(1), 0, 1, TimeUnit.MILLISECONDS)
+
+      // wait for first task to execute and then schedule the next task while the first one is running
+      assertTrue(initLatch.await(10, TimeUnit.SECONDS))
+      mockTime.scheduler.schedule("test2", () => scheduledTask(taskLatches(1)), delay = 1)
+
+      taskLatches.foreach(_.countDown())
+      assertTrue("Tasks did not complete", completionLatch.await(10, TimeUnit.SECONDS))
+
+    } finally {
+      tickExecutor.shutdownNow()
+    }
+  }
+}