You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/12/05 09:05:38 UTC

[kafka] branch trunk updated: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d4cf64  KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)
0d4cf64 is described below

commit 0d4cf64af359d22749f7e865c4efaee773d64962
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Dec 5 09:05:26 2018 +0000

    KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)
    
    Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. While appending new records, complete fetch requests after releasing leaderIsrUpdate of the Partition to which records were appended to avoid deadlocks in request handler threads.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 96 +++++++++++++++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  6 ++
 3 files changed, 102 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a..1f52bd7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -740,8 +740,6 @@ class Partition(val topicPartition: TopicPartition,
           }
 
           val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
-          // probably unblock some follower fetch requests since log end offset has been updated
-          replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           (info, maybeIncrementLeaderHW(leaderReplica))
 
@@ -754,6 +752,10 @@ class Partition(val topicPartition: TopicPartition,
     // some delayed operations may be unblocked after HW changed
     if (leaderHWIncremented)
       tryCompleteDelayedRequests()
+    else {
+      // probably unblock some follower fetch requests since log end offset has been updated
+      replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition))
+    }
 
     info
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 6e38ca9..cfaa147 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,14 +19,14 @@ package kafka.cluster
 import java.io.File
 import java.nio.ByteBuffer
 import java.util.{Optional, Properties}
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.api.Request
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
-import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
@@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
-import org.easymock.EasyMock
+import org.easymock.{Capture, EasyMock, IAnswer}
 
 import scala.collection.JavaConverters._
 
@@ -671,7 +671,95 @@ class PartitionTest {
     partition.updateReplicaLogReadResult(follower1Replica,
                                          readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
     assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId))
- }
+  }
+
+  /**
+   * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
+   * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
+   * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
+   * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
+   * read lock of one Partition while holding on to read lock of another Partition.
+   */
+  @Test
+  def testDelayedFetchAfterAppendRecords(): Unit = {
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val controllerId = 0
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
+    val isr = replicaIds
+    val logConfig = LogConfig(new Properties)
+
+    val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
+    val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) }
+    val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }
+    val partitions = replicas.map { replica =>
+      val tp = replica.topicPartition
+      val partition = new Partition(tp,
+        isOffline = false,
+        replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+        localBrokerId = brokerId,
+        time,
+        replicaManager,
+        logManager,
+        zkClient)
+      partition.addReplicaIfNotExists(replica)
+      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+        leaderEpoch, isr, 1, replicaIds, true), 0)
+      partition
+    }
+
+    // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
+    val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
+    EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size
+          val partition = partitions(anotherPartition)
+          partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
+        }
+      }).anyTimes()
+    EasyMock.replay(replicaManager, zkClient)
+
+    def createRecords(baseOffset: Long): MemoryRecords = {
+      val records = List(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes))
+      val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+      val builder = MemoryRecords.builder(
+        buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
+        baseOffset, time.milliseconds, 0)
+      records.foreach(builder.append)
+      builder.build()
+    }
+
+    val done = new AtomicBoolean()
+    val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
+    try {
+      // Invoke some operation that acquires leaderIsrUpdate write lock on one thread
+      executor.submit(CoreUtils.runnable {
+        while (!done.get) {
+          partitions.foreach(_.maybeShrinkIsr(10000))
+        }
+      })
+      // Append records to partitions, one partition-per-thread
+      val futures = partitions.map { partition =>
+        executor.submit(CoreUtils.runnable {
+          (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) }
+        })
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+      done.set(true)
+    } catch {
+      case e: TimeoutException =>
+        val allThreads = TestUtils.allThreadStackTraces()
+        fail(s"Test timed out with exception $e, thread stack traces: $allThreads")
+    } finally {
+      executor.shutdownNow()
+      executor.awaitTermination(5, TimeUnit.SECONDS)
+    }
+  }
 
   def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bcb0581..e5ea6a4 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -958,6 +958,12 @@ object TestUtils extends Logging {
     assertEquals(0, threadCount)
   }
 
+  def allThreadStackTraces(): String = {
+    Thread.getAllStackTraces.asScala.map { case (thread, stackTrace) =>
+      thread.getName + "\n\t" + stackTrace.toList.map(_.toString).mkString("\n\t")
+    }.mkString("\n")
+  }
+
   /**
    * Create new LogManager instance with default configuration for testing
    */