You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/15 07:09:53 UTC

[kafka] branch trunk updated: KAFKA-6653; Complete delayed operations even when there is lock contention (#4704)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cdb951  KAFKA-6653; Complete delayed operations even when there is lock contention (#4704)
5cdb951 is described below

commit 5cdb951091129b530326140aa711f0b61c446f06
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Thu Mar 15 07:09:48 2018 +0000

    KAFKA-6653; Complete delayed operations even when there is lock contention (#4704)
    
    If there is lock contention while multiple threads check if a delayed operation may be completed (e.g. a produce request with acks=-1), the threads perform completion only if the lock is free, to avoid deadlocks. This leaves a timing window when an operation becomes ready to complete after another thread has acquired the lock and performed the check for completion, but not yet released the lock. The PR adds an additional flag to ensure that the operation is completed in this case.
---
 .../main/scala/kafka/server/DelayedOperation.scala |  37 +++-
 .../unit/kafka/server/DelayedOperationTest.scala   | 246 ++++++++++++++-------
 2 files changed, 197 insertions(+), 86 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 894d30e..2a096e1 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -47,6 +47,7 @@ abstract class DelayedOperation(override val delayMs: Long,
     lockOpt: Option[Lock] = None) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
+  private val tryCompletePending = new AtomicBoolean(false)
   // Visible for testing
   private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
 
@@ -101,16 +102,38 @@ abstract class DelayedOperation(override val delayMs: Long,
   /**
    * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
    * without blocking.
+   *
+   * If threadA acquires the lock and performs the check for completion before completion criteria is met
+   * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
+   * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
+   * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
+   * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
+   * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
+   * the operation is actually completed.
    */
   private[server] def maybeTryComplete(): Boolean = {
-    if (lock.tryLock()) {
-      try {
-        tryComplete()
-      } finally {
-        lock.unlock()
+    var retry = false
+    var done = false
+    do {
+      if (lock.tryLock()) {
+        try {
+          tryCompletePending.set(false)
+          done = tryComplete()
+        } finally {
+          lock.unlock()
+        }
+        // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
+        // `tryCompletePending`. In this case we should retry.
+        retry = tryCompletePending.get()
+      } else {
+        // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
+        // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
+        // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
+        // released the lock and returned by the time the flag is set.
+        retry = !tryCompletePending.getAndSet(true)
       }
-    } else
-      false
+    } while (!isCompleted && retry)
+    done
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index d4d79e5..3b077a0 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,11 +17,13 @@
 
 package kafka.server
 
-import java.util.concurrent.{Executors, Future}
+import java.util.Random
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.utils.CoreUtils.inLock
-
+import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
@@ -29,6 +31,7 @@ import org.junit.Assert._
 class DelayedOperationTest {
 
   var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
+  var executorService: ExecutorService = null
 
   @Before
   def setUp() {
@@ -38,6 +41,8 @@ class DelayedOperationTest {
   @After
   def tearDown() {
     purgatory.shutdown()
+    if (executorService != null)
+      executorService.shutdown()
   }
 
   @Test
@@ -122,6 +127,94 @@ class DelayedOperationTest {
     assertEquals(Nil, cancelledOperations)
   }
 
+  /**
+    * Verify that if there is lock contention between two threads attempting to complete,
+    * completion is performed without any blocking in either thread.
+    */
+  @Test
+  def testTryCompleteLockContention(): Unit = {
+    executorService = Executors.newSingleThreadExecutor()
+    val completionAttemptsRemaining = new AtomicInteger(Int.MaxValue)
+    val tryCompleteSemaphore = new Semaphore(1)
+    val key = "key"
+
+    val op = new MockDelayedOperation(100000L, None, None) {
+      override def tryComplete() = {
+        val shouldComplete = completionAttemptsRemaining.decrementAndGet <= 0
+        tryCompleteSemaphore.acquire()
+        try {
+          if (shouldComplete)
+            forceComplete()
+          else
+            false
+        } finally {
+          tryCompleteSemaphore.release()
+        }
+      }
+    }
+
+    purgatory.tryCompleteElseWatch(op, Seq(key))
+    completionAttemptsRemaining.set(2)
+    tryCompleteSemaphore.acquire()
+    val future = runOnAnotherThread(purgatory.checkAndComplete(key), shouldComplete = false)
+    TestUtils.waitUntilTrue(() => tryCompleteSemaphore.hasQueuedThreads, "Not attempting to complete")
+    purgatory.checkAndComplete(key) // this should not block even though lock is not free
+    assertFalse("Operation should not have completed", op.isCompleted)
+    tryCompleteSemaphore.release()
+    future.get(10, TimeUnit.SECONDS)
+    assertTrue("Operation should have completed", op.isCompleted)
+  }
+
+  /**
+    * Test `tryComplete` with multiple threads to verify that there are no timing windows
+    * when completion is not performed even if the thread that makes the operation completable
+    * may not be able to acquire the operation lock. Since it is difficult to test all scenarios,
+    * this test uses random delays with a large number of threads.
+    */
+  @Test
+  def testTryCompleteWithMultipleThreads(): Unit = {
+    val executor = Executors.newScheduledThreadPool(20)
+    this.executorService = executor
+    val random = new Random
+    val maxDelayMs = 10
+    val completionAttempts = 20
+
+    class TestDelayOperation(index: Int) extends MockDelayedOperation(10000L) {
+      val key = s"key$index"
+      val completionAttemptsRemaining = new AtomicInteger(completionAttempts)
+
+      override def tryComplete(): Boolean = {
+        val shouldComplete = completable
+        Thread.sleep(random.nextInt(maxDelayMs))
+        if (shouldComplete)
+          forceComplete()
+        else
+          false
+      }
+    }
+    val ops = (0 until 100).map { index =>
+      val op = new TestDelayOperation(index)
+      purgatory.tryCompleteElseWatch(op, Seq(op.key))
+      op
+    }
+
+    def scheduleTryComplete(op: TestDelayOperation, delayMs: Long): Future[_] = {
+      executor.schedule(new Runnable {
+        override def run(): Unit = {
+          if (op.completionAttemptsRemaining.decrementAndGet() == 0)
+            op.completable = true
+          purgatory.checkAndComplete(op.key)
+        }
+      }, delayMs, TimeUnit.MILLISECONDS)
+    }
+
+    (1 to completionAttempts).flatMap { _ =>
+      ops.map { op => scheduleTryComplete(op, random.nextInt(maxDelayMs)) }
+    }.foreach { future => future.get }
+
+    ops.foreach { op => assertTrue("Operation should have completed", op.isCompleted) }
+  }
+
   @Test
   def testDelayedOperationLock() {
     verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false)
@@ -141,102 +234,97 @@ class DelayedOperationTest {
 
   def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean) {
     val key = "key"
-    val executorService = Executors.newSingleThreadExecutor
-    try {
-      def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
-        (1 to count).map { _ =>
-          val op = mockDelayedOperation
-          purgatory.tryCompleteElseWatch(op, Seq(key))
-          assertFalse("Not completable", op.isCompleted)
-          op
-        }
+    executorService = Executors.newSingleThreadExecutor
+    def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
+      (1 to count).map { _ =>
+        val op = mockDelayedOperation
+        purgatory.tryCompleteElseWatch(op, Seq(key))
+        assertFalse("Not completable", op.isCompleted)
+        op
       }
+    }
 
-      def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
-        (1 to count).map { _ =>
-          val op = mockDelayedOperation
-          op.completable = true
-          op
-        }
+    def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
+      (1 to count).map { _ =>
+        val op = mockDelayedOperation
+        op.completable = true
+        op
       }
+    }
 
-      def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = {
-        val future = executorService.submit(new Runnable {
-          def run() = fun
-        })
-        if (shouldComplete)
-          future.get()
-        else
-          assertFalse("Should not have completed", future.isDone)
-        future
-      }
+    def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = {
+      completableOps.foreach(op => op.completable = true)
+      val completed = purgatory.checkAndComplete(key)
+      assertEquals(expectedComplete.size, completed)
+      expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted))
+      val expectedNotComplete = completableOps.toSet -- expectedComplete
+      expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted))
+    }
 
-      def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]): Unit = {
-        completableOps.foreach(op => op.completable = true)
-        val completed = purgatory.checkAndComplete(key)
-        assertEquals(expectedComplete.size, completed)
-        expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted))
-        val expectedNotComplete = completableOps.toSet -- expectedComplete
-        expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted))
-      }
+    // If locks are free all completable operations should complete
+    var ops = createDelayedOperations(2)
+    checkAndComplete(ops, ops)
 
-      // If locks are free all completable operations should complete
-      var ops = createDelayedOperations(2)
+    // Lock held by current thread, completable operations should complete
+    ops = createDelayedOperations(2)
+    inLock(ops(1).lock) {
       checkAndComplete(ops, ops)
+    }
 
-      // Lock held by current thread, completable operations should complete
-      ops = createDelayedOperations(2)
-      inLock(ops(1).lock) {
-        checkAndComplete(ops, ops)
-      }
+    // Lock held by another thread, should not block, only operations that can be
+    // locked without blocking on the current thread should complete
+    ops = createDelayedOperations(2)
+    runOnAnotherThread(ops(0).lock.lock(), true)
+    try {
+      checkAndComplete(ops, Seq(ops(1)))
+    } finally {
+      runOnAnotherThread(ops(0).lock.unlock(), true)
+      checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+    }
 
-      // Lock held by another thread, should not block, only operations that can be
-      // locked without blocking on the current thread should complete
-      ops = createDelayedOperations(2)
-      runOnAnotherThread(ops(0).lock.lock(), true)
+    // Lock acquired by response callback held by another thread, should not block
+    // if the response lock is used as operation lock, only operations
+    // that can be locked without blocking on the current thread should complete
+    ops = createDelayedOperations(2)
+    ops(0).responseLockOpt.foreach { lock =>
+      runOnAnotherThread(lock.lock(), true)
       try {
-        checkAndComplete(ops, Seq(ops(1)))
-      } finally {
-        runOnAnotherThread(ops(0).lock.unlock(), true)
-        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
-      }
-
-      // Lock acquired by response callback held by another thread, should not block
-      // if the response lock is used as operation lock, only operations
-      // that can be locked without blocking on the current thread should complete
-      ops = createDelayedOperations(2)
-      ops(0).responseLockOpt.foreach { lock =>
-        runOnAnotherThread(lock.lock(), true)
         try {
-          try {
-            checkAndComplete(ops, Seq(ops(1)))
-            assertFalse("Should have failed with mismatched locks", mismatchedLocks)
-          } catch {
-            case e: IllegalStateException =>
-              assertTrue("Should not have failed with valid locks", mismatchedLocks)
-          }
-        } finally {
-          runOnAnotherThread(lock.unlock(), true)
-          checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+          checkAndComplete(ops, Seq(ops(1)))
+          assertFalse("Should have failed with mismatched locks", mismatchedLocks)
+        } catch {
+          case e: IllegalStateException =>
+            assertTrue("Should not have failed with valid locks", mismatchedLocks)
         }
+      } finally {
+        runOnAnotherThread(lock.unlock(), true)
+        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
       }
+    }
 
-      // Immediately completable operations should complete without locking
-      ops = createCompletableOperations(2)
-      ops.foreach { op =>
-        assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key)))
-        assertTrue("Should have completed", op.isCompleted)
-      }
-
-    } finally {
-      executorService.shutdown()
+    // Immediately completable operations should complete without locking
+    ops = createCompletableOperations(2)
+    ops.foreach { op =>
+      assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key)))
+      assertTrue("Should have completed", op.isCompleted)
     }
   }
 
+  private def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = {
+    val future = executorService.submit(new Runnable {
+      def run() = fun
+    })
+    if (shouldComplete)
+      future.get()
+    else
+      assertFalse("Should not have completed", future.isDone)
+    future
+  }
 
   class MockDelayedOperation(delayMs: Long,
-      lockOpt: Option[ReentrantLock] = None,
-      val responseLockOpt: Option[ReentrantLock] = None) extends DelayedOperation(delayMs, lockOpt) {
+                             lockOpt: Option[ReentrantLock] = None,
+                             val responseLockOpt: Option[ReentrantLock] = None)
+                             extends DelayedOperation(delayMs, lockOpt) {
     var completable = false
 
     def awaitExpiration() {

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.