You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/03/31 12:55:56 UTC

[kafka] branch 3.2 updated: KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized (#11953)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 0f25205  KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized (#11953)
0f25205 is described below

commit 0f25205ab48d110d56f4d068daafd297fb8e5b82
Author: Yu <yu...@live.com>
AuthorDate: Thu Mar 31 20:45:59 2022 +0800

    KAFKA-13772: Partitions are not correctly re-partitioned when the fetcher thread pool is resized (#11953)
    
    Partitions are assigned to fetcher threads based on their hash modulo the number of fetcher threads. When we resize the fetcher thread pool, we basically re-distribute all the partitions based on the new fetcher thread pool size. The issue is that the logic that resizes the fetcher thread pool updates the `fetcherThreadMap` while iterating over it. The `Map` does not give any guarantee in this case - especially when the underlying map is re-hashed - and that led to not iterating over  [...]
    
    Reviewers: Luke Chen <sh...@gmail.com>, David Jacot <dj...@confluent.io>
---
 .../kafka/server/AbstractFetcherManager.scala      |  23 ++--
 .../scala/kafka/server/AbstractFetcherThread.scala |  12 +++
 .../kafka/server/AbstractFetcherManagerTest.scala  | 119 ++++++++++++++++++++-
 3 files changed, 145 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 7780535..0843fe8 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
-        val partitionStates = removeFetcherForPartitions(thread.partitions)
+        val partitionStates = thread.removeAllPartitions()
         if (id.fetcherId >= newSize)
           thread.shutdown()
-        val fetchStates = partitionStates.map { case (topicPartition, currentFetchState) =>
-          val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.sourceBroker,
-            currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
-            initOffset = currentFetchState.fetchOffset)
-          topicPartition -> initialFetchState
+        partitionStates.forKeyValue { (topicPartition, currentFetchState) =>
+            val initialFetchState = InitialFetchState(currentFetchState.topicId, thread.sourceBroker,
+              currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
+              initOffset = currentFetchState.fetchOffset)
+            allRemovedPartitionsMap += topicPartition -> initialFetchState
         }
-        addFetcherForPartitions(fetchStates)
       }
+      // failed partitions are removed when adding partitions to fetcher
+      addFetcherForPartitions(allRemovedPartitionsMap)
     }
+
     lock synchronized {
       val currentSize = numFetchersPerBroker
       info(s"Resizing fetcher thread pool size from $currentSize to $newSize")
@@ -145,7 +148,7 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri
           case None =>
             addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
         }
-
+        // failed partitions are removed when added partitions to thread
         addPartitionsToFetcherThread(fetcherThread, initialFetchOffsets)
       }
     }
@@ -251,6 +254,10 @@ class FailedPartitions {
   def contains(topicPartition: TopicPartition): Boolean = synchronized {
     failedPartitionsSet.contains(topicPartition)
   }
+
+  def partitions(): Set[TopicPartition] = synchronized {
+    failedPartitionsSet.toSet
+  }
 }
 
 case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 492cec4..507d6bb 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -743,6 +743,18 @@ abstract class AbstractFetcherThread(name: String,
     } finally partitionMapLock.unlock()
   }
 
+  def removeAllPartitions(): Map[TopicPartition, PartitionFetchState] = {
+    partitionMapLock.lockInterruptibly()
+    try {
+      val allPartitionState = partitionStates.partitionStateMap.asScala.toMap
+      allPartitionState.keys.foreach { tp =>
+        partitionStates.remove(tp)
+        fetcherLagStats.unregister(tp)
+      }
+      allPartitionState
+    } finally partitionMapLock.unlock()
+  }
+
   def partitionCount: Int = {
     partitionMapLock.lockInterruptibly()
     try partitionStates.size
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index 647f8ae..875566e 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -19,12 +19,19 @@ package kafka.server
 import com.yammer.metrics.core.Gauge
 import kafka.cluster.BrokerEndPoint
 import kafka.metrics.KafkaYammerMetrics
+import kafka.log.LogAppendInfo
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
+import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.TestUtils
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.junit.jupiter.api.{BeforeEach, Test}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.Mockito.{mock, verify, when}
 
+import scala.collection.{Map, Set, mutable}
 import scala.jdk.CollectionConverters._
 
 class AbstractFetcherManagerTest {
@@ -100,6 +107,7 @@ class AbstractFetcherManagerTest {
     fetcherManager.removeFetcherForPartitions(Set(tp))
     assertEquals(0, getMetricValue(metricName))
   }
+
   @Test
   def testDeadThreadCountMetric(): Unit = {
     val fetcher: AbstractFetcherThread = mock(classOf[AbstractFetcherThread])
@@ -210,4 +218,113 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    testResizeThreadPool(10, 50)
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    testResizeThreadPool(50, 10)
+  }
+
+  private def testResizeThreadPool(currentFetcherSize: Int, newFetcherSize: Int, brokerNum: Int = 6): Unit = {
+    val fetchingTopicPartitions = makeTopicPartition(10, 100)
+    val failedTopicPartitions = makeTopicPartition(2, 5, "topic_failed")
+    val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", currentFetcherSize) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker, failedPartitions)
+      }
+    }
+    try {
+      fetcherManager.addFetcherForPartitions(fetchingTopicPartitions.map { tp =>
+        val brokerId = getBrokerId(tp, brokerNum)
+        val brokerEndPoint = new BrokerEndPoint(brokerId, s"kafka-host-$brokerId", 9092)
+        tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+      }.toMap)
+
+      // Mark some of these partitions failed within resizing scope
+      fetchingTopicPartitions.take(20).foreach(fetcherManager.addFailedPartition)
+      // Mark failed partitions out of resizing scope
+      failedTopicPartitions.foreach(fetcherManager.addFailedPartition)
+
+      fetcherManager.resizeThreadPool(newFetcherSize)
+
+      val ownedPartitions = mutable.Set.empty[TopicPartition]
+      fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, fetcherThread) =>
+        val fetcherId = brokerIdAndFetcherId.fetcherId
+        val brokerId = brokerIdAndFetcherId.brokerId
+
+        fetcherThread.partitions.foreach { tp =>
+          ownedPartitions += tp
+          assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+          assertEquals(getBrokerId(tp, brokerNum), brokerId)
+        }
+      }
+      // Verify that all partitions are owned by the fetcher threads.
+      assertEquals(fetchingTopicPartitions, ownedPartitions)
+
+      // Only failed partitions should still be kept after resizing
+      assertEquals(failedTopicPartitions, fetcherManager.failedPartitions.partitions())
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+
+  private def makeTopicPartition(topicNum: Int, partitionNum: Int, topicPrefix: String = "topic_"): Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    for (i <- 0 to topicNum - 1) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum - 1) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  private def getBrokerId(tp: TopicPartition, brokerNum: Int): Int = {
+    Utils.abs(tp.hashCode) % brokerNum
+  }
+
+  private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions)
+    extends AbstractFetcherThread(
+      name = "test-resize-fetcher",
+      clientId = "mock-fetcher",
+      sourceBroker,
+      failedPartitions,
+      fetchBackOffMs = 0,
+      brokerTopicStats = new BrokerTopicStats) {
+
+    override protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+      None
+    }
+
+    override protected def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {}
+
+    override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {}
+
+    override protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = ResultWithPartitions(None, Set.empty)
+
+    override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = Some(0)
+
+    override protected def logStartOffset(topicPartition: TopicPartition): Long = 1
+
+    override protected def logEndOffset(topicPartition: TopicPartition): Long = 1
+
+    override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0))
+
+    override protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty
+
+    override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty
+
+    override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = 1
+
+    override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = 1
+
+    override protected val isOffsetForLeaderEpochSupported: Boolean = false
+    override protected val isTruncationOnFetchSupported: Boolean = false
+  }
+
 }