You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/09/08 16:37:46 UTC

[kafka] branch 3.0 updated: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers (#11294)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 6a2fbea  KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers (#11294)
6a2fbea is described below

commit 6a2fbeaf8c50606996c635c192ad948697b3699a
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Sep 8 18:23:37 2021 +0200

    KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers (#11294)
    
     `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log:
    
    ```
    [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun [...]
    ```
    
    The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between.
    
    The patch fixes the issue by removing the partitions from the replica fetcher threads before creating the `InitialFetchState` for them.
    
    (cherry-picked from commit 06e53afbefaf3ad1f64a49607775bbcc85edb81e)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  79 ++++++++-------
 .../unit/kafka/server/ReplicaManagerTest.scala     | 108 ++++++++++++++++++++-
 2 files changed, 150 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 683e1a8..95491ed 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1687,6 +1687,8 @@ class ReplicaManager(val config: KafkaConfig,
         }
       }
 
+      // Stopping the fetchers must be done first in order to initialize the fetch
+      // position correctly.
       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
       stateChangeLogger.info(s"Stopped fetchers as part of become-follower request from controller $controllerId " +
         s"epoch $controllerEpoch with correlation id $correlationId for ${partitionsToMakeFollower.size} partitions")
@@ -2154,44 +2156,34 @@ class ReplicaManager(val config: KafkaConfig,
     stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
       "local followers.")
     val shuttingDown = isShuttingDown.get()
-    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, InitialFetchState]
+    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, Partition]
     val newFollowerTopicSet = new mutable.HashSet[String]
     newLocalFollowers.forKeyValue { case (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
         try {
-          newFollowerTopicSet.add(tp.topic())
+          newFollowerTopicSet.add(tp.topic)
 
           if (shuttingDown) {
             stateChangeLogger.trace(s"Unable to start fetching ${tp} with topic " +
               s"ID ${info.topicId} because the replica manager is shutting down.")
           } else {
-            val listenerName = config.interBrokerListenerName.value()
             val leader = info.partition.leader
-            Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) match {
-              case None =>
-                stateChangeLogger.trace(
-                  s"Unable to start fetching ${tp} with topic ID ${info.topicId} from leader " +
-                  s"${leader} because it is not alive."
-                )
-
-                // Create the local replica even if the leader is unavailable. This is required
-                // to ensure that we include the partition's high watermark in the checkpoint
-                // file (see KAFKA-1647)
-                partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
-              case Some(node) =>
-                val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-                if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
-                  val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
-                  val log = partition.localLogOrException
-                  val fetchOffset = initialFetchOffset(log)
-                  partitionsToMakeFollower.put(tp,
-                    InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
-                } else {
-                  stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
-                    s"follower for partition $tp with id ${info.topicId} and partition state $state."
-                  )
-                }
+            if (newImage.cluster.broker(leader) == null) {
+              stateChangeLogger.trace(s"Unable to start fetching $tp with topic ID ${info.topicId} " +
+                s"from leader $leader because it is not alive.")
+
+              // Create the local replica even if the leader is unavailable. This is required
+              // to ensure that we include the partition's high watermark in the checkpoint
+              // file (see KAFKA-1647).
+              partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
+            } else {
+              val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+              if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
+                partitionsToMakeFollower.put(tp, partition)
+              } else {
+                stateChangeLogger.info("Skipped the become-follower state change after marking its " +
+                  s"partition as follower for partition $tp with id ${info.topicId} and partition state $state.")
+              }
             }
           }
           changedPartitions.add(partition)
@@ -2203,10 +2195,27 @@ class ReplicaManager(val config: KafkaConfig,
       }
     }
 
+    // Stopping the fetchers must be done first in order to initialize the fetch
+    // position correctly.
     replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
     stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
 
-    replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
+    val listenerName = config.interBrokerListenerName.value
+    val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
+    partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
+      val node = partition.leaderReplicaIdOpt
+        .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
+        .flatMap(_.node(listenerName).asScala)
+        .getOrElse(Node.noNode)
+      val log = partition.localLogOrException
+      partitionAndOffsets.put(topicPartition, InitialFetchState(
+        new BrokerEndPoint(node.id, node.host, node.port),
+        partition.getLeaderEpoch,
+        initialFetchOffset(log)
+      ))
+    }
+
+    replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
     stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
 
     partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
@@ -2215,15 +2224,15 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
-    stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
-      case (topicPartition, e) =>
-        if (e.isInstanceOf[KafkaStorageException]) {
+    stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, exception) =>
+      exception match {
+        case e: KafkaStorageException =>
           stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
-            "the local replica for the partition is in an offline log directory")
-        } else {
+            s"the local replica for the partition is in an offline log directory: ${e.getMessage}.")
+        case e: Throwable =>
           stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
             s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e)
-        }
+      }
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 71a1760..243f9ce 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1812,7 +1812,8 @@ class ReplicaManagerTest {
     timer: MockTimer,
     brokerId: Int = 0,
     aliveBrokerIds: Seq[Int] = Seq(0, 1),
-    propsModifier: Properties => Unit = _ => {}
+    propsModifier: Properties => Unit = _ => {},
+    mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None
   ): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
@@ -1837,7 +1838,24 @@ class ReplicaManagerTest {
       new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
       metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory,
       mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory, Option(this.getClass.getName),
-      alterIsrManager)
+      alterIsrManager) {
+
+      override protected def createReplicaFetcherManager(
+        metrics: Metrics,
+        time: Time,
+        threadNamePrefix: Option[String],
+        quotaManager: ReplicationQuotaManager
+      ): ReplicaFetcherManager = {
+        mockReplicaFetcherManager.getOrElse {
+          super.createReplicaFetcherManager(
+            metrics,
+            time,
+            threadNamePrefix,
+            quotaManager
+          )
+        }
+      }
+    }
   }
 
   @Test
@@ -3044,6 +3062,92 @@ class ReplicaManagerTest {
   }
 
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      // The first call to removeFetcherForPartitions should be ignored.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower
+      var followerTopicsDelta = topicsCreateDelta(localId, false)
+      var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 0,
+            initOffset = 0
+          ))
+        )
+
+      // The second call to removeFetcherForPartitions simulate the case
+      // where the fetcher write to the log before being shutdown.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenAnswer { _ =>
+        replicaManager.getPartition(topicPartition) match {
+          case HostedPartition.Online(partition) =>
+            partition.appendRecordsToFollowerOrFutureReplica(
+              records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+                new SimpleRecord("first message".getBytes)),
+              isFuture = false
+            )
+
+          case _ =>
+        }
+
+        Map.empty[TopicPartition, PartitionFetchState]
+      }
+
+      // Apply changes that bumps the leader epoch.
+      followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false)
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 1,
+            initOffset = 1
+          ))
+        )
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
   private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)