You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/12/05 09:10:30 UTC

[kafka] branch 2.1 updated: KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 604716f  KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)
604716f is described below

commit 604716f9fc6d697e3e8cdf1362a6fb0b3f239587
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Dec 5 09:05:26 2018 +0000

    KAFKA-7697: Process DelayedFetch without holding leaderIsrUpdateLock (#5999)
    
    Delayed fetch operations acquire leaderIsrUpdate read lock of one or more Partitions from the fetch request when attempting to complete the fetch operation. While appending new records, complete fetch requests after releasing leaderIsrUpdate of the Partition to which records were appended to avoid deadlocks in request handler threads.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  6 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 96 +++++++++++++++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  6 ++
 3 files changed, 102 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index f7ed2ca..caca66e 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -739,8 +739,6 @@ class Partition(val topicPartition: TopicPartition,
           }
 
           val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
-          // probably unblock some follower fetch requests since log end offset has been updated
-          replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
           // we may need to increment high watermark since ISR could be down to 1
           (info, maybeIncrementLeaderHW(leaderReplica))
 
@@ -753,6 +751,10 @@ class Partition(val topicPartition: TopicPartition,
     // some delayed operations may be unblocked after HW changed
     if (leaderHWIncremented)
       tryCompleteDelayedRequests()
+    else {
+      // probably unblock some follower fetch requests since log end offset has been updated
+      replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topicPartition))
+    }
 
     info
   }
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index e05f148..3ae980e 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -19,14 +19,14 @@ package kafka.cluster
 import java.io.File
 import java.nio.ByteBuffer
 import java.util.{Optional, Properties}
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.api.Request
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.log.{Defaults => _, _}
 import kafka.server._
-import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.utils.{CoreUtils, MockScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
@@ -38,7 +38,7 @@ import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, Li
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 import org.scalatest.Assertions.assertThrows
-import org.easymock.EasyMock
+import org.easymock.{Capture, EasyMock, IAnswer}
 
 import scala.collection.JavaConverters._
 
@@ -651,7 +651,95 @@ class PartitionTest {
     partition.updateReplicaLogReadResult(follower1Replica,
                                          readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset), batch3), leaderReplica))
     assertEquals("ISR", Set[Integer](leader, follower1, follower2), partition.inSyncReplicas.map(_.brokerId))
- }
+  }
+
+  /**
+   * Verify that delayed fetch operations which are completed when records are appended don't result in deadlocks.
+   * Delayed fetch operations acquire Partition leaderIsrUpdate read lock for one or more partitions. So they
+   * need to be completed after releasing the lock acquired to append records. Otherwise, waiting writers
+   * (e.g. to check if ISR needs to be shrinked) can trigger deadlock in request handler threads waiting for
+   * read lock of one Partition while holding on to read lock of another Partition.
+   */
+  @Test
+  def testDelayedFetchAfterAppendRecords(): Unit = {
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val controllerId = 0
+    val controllerEpoch = 0
+    val leaderEpoch = 5
+    val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
+    val isr = replicaIds
+    val logConfig = LogConfig(new Properties)
+
+    val topicPartitions = (0 until 5).map { i => new TopicPartition("test-topic", i) }
+    val logs = topicPartitions.map { tp => logManager.getOrCreateLog(tp, logConfig) }
+    val replicas = logs.map { log => new Replica(brokerId, log.topicPartition, time, log = Some(log)) }
+    val partitions = replicas.map { replica =>
+      val tp = replica.topicPartition
+      val partition = new Partition(tp,
+        isOffline = false,
+        replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+        localBrokerId = brokerId,
+        time,
+        replicaManager,
+        logManager,
+        zkClient)
+      partition.addReplicaIfNotExists(replica)
+      partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, brokerId,
+        leaderEpoch, isr, 1, replicaIds, true), 0)
+      partition
+    }
+
+    // Acquire leaderIsrUpdate read lock of a different partition when completing delayed fetch
+    val tpKey: Capture[TopicPartitionOperationKey] = EasyMock.newCapture()
+    EasyMock.expect(replicaManager.tryCompleteDelayedFetch(EasyMock.capture(tpKey)))
+      .andAnswer(new IAnswer[Unit] {
+        override def answer(): Unit = {
+          val anotherPartition = (tpKey.getValue.partition + 1) % topicPartitions.size
+          val partition = partitions(anotherPartition)
+          partition.fetchOffsetSnapshot(Optional.of(leaderEpoch), fetchOnlyFromLeader = true)
+        }
+      }).anyTimes()
+    EasyMock.replay(replicaManager, zkClient)
+
+    def createRecords(baseOffset: Long): MemoryRecords = {
+      val records = List(
+        new SimpleRecord("k1".getBytes, "v1".getBytes),
+        new SimpleRecord("k2".getBytes, "v2".getBytes))
+      val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+      val builder = MemoryRecords.builder(
+        buf, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, TimestampType.CREATE_TIME,
+        baseOffset, time.milliseconds, 0)
+      records.foreach(builder.append)
+      builder.build()
+    }
+
+    val done = new AtomicBoolean()
+    val executor = Executors.newFixedThreadPool(topicPartitions.size + 1)
+    try {
+      // Invoke some operation that acquires leaderIsrUpdate write lock on one thread
+      executor.submit(CoreUtils.runnable {
+        while (!done.get) {
+          partitions.foreach(_.maybeShrinkIsr(10000))
+        }
+      })
+      // Append records to partitions, one partition-per-thread
+      val futures = partitions.map { partition =>
+        executor.submit(CoreUtils.runnable {
+          (1 to 10000).foreach { _ => partition.appendRecordsToLeader(createRecords(baseOffset = 0), isFromClient = true) }
+        })
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+      done.set(true)
+    } catch {
+      case e: TimeoutException =>
+        val allThreads = TestUtils.allThreadStackTraces()
+        fail(s"Test timed out with exception $e, thread stack traces: $allThreads")
+    } finally {
+      executor.shutdownNow()
+      executor.awaitTermination(5, TimeUnit.SECONDS)
+    }
+  }
 
   def createRecords(records: Iterable[SimpleRecord], baseOffset: Long, partitionLeaderEpoch: Int = 0): MemoryRecords = {
     val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 9974230..9d5906f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -934,6 +934,12 @@ object TestUtils extends Logging {
     assertEquals(0, threadCount)
   }
 
+  def allThreadStackTraces(): String = {
+    Thread.getAllStackTraces.asScala.map { case (thread, stackTrace) =>
+      thread.getName + "\n\t" + stackTrace.toList.map(_.toString).mkString("\n\t")
+    }.mkString("\n")
+  }
+
   /**
    * Create new LogManager instance with default configuration for testing
    */