You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/03 12:28:04 UTC

[GitHub] [kafka] dajac opened a new pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

dajac opened a new pull request #11294:
URL: https://github.com/apache/kafka/pull/11294


    `ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes fails with the following error in the log:
   
   ```
   [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Unexpected error occurred while processing data for partition __consumer_offsets-1 at offset 31727 (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end offset = 31728. at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scal
 a:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Partition __consumer_offsets-1 marked as failed (kafka.server.ReplicaFetcherThread)
   ```
   
   The issue is due to a race condition in `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created and populated before the partition is removed from the fetcher threads. This means that the fetch offset of the `InitialFetchState` could be outdated when the fetcher threads are re-started because the fetcher threads could have incremented the log end offset in between.
   
   The patch fixes the issue by removing the partitions from the replica fetcher threads before creating the `InitialFetchState` for them.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#issuecomment-915005946


   @hachikuji Thanks for your review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#issuecomment-915393112


   Merged to master and to 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#discussion_r704137955



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3242,6 +3260,92 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      // The first call to removeFetcherForPartitions should be ignored.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower
+      var followerTopicsDelta = topicsCreateDelta(localId, false)
+      var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 0,
+            initOffset = 0
+          ))
+        )
+
+      // The second call to removeFetcherForPartitions simulate the case
+      // where the fetcher write to the log before being shutdown.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenAnswer { _ =>
+        replicaManager.getPartition(topicPartition) match {
+          case HostedPartition.Online(partition) =>
+            partition.appendRecordsToFollowerOrFutureReplica(
+              records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+                new SimpleRecord("first message".getBytes)),
+              isFuture = false
+            )
+
+          case _ =>
+        }
+
+        Map.empty[TopicPartition, PartitionFetchState]
+      }
+
+      // Apply changes that bumps the leader epoch.
+      followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false)
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 1,
+            initOffset = 1
+          ))
+        )
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)

Review comment:
       I was also thinking about this as most of the tests have it but wanted to test/refactor this separately if you don't mind. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11294:
URL: https://github.com/apache/kafka/pull/11294


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#discussion_r703942630



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -3242,6 +3260,92 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val topicPartition = new TopicPartition("foo", 0)
+
+    val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager])
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(
+      timer = new MockTimer(time),
+      brokerId = localId,
+      mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+    )
+
+    try {
+      // The first call to removeFetcherForPartitions should be ignored.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+      // Make the local replica the follower
+      var followerTopicsDelta = topicsCreateDelta(localId, false)
+      var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      // Check the state of that partition
+      val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
+      assertFalse(followerPartition.isLeader)
+      assertEquals(0, followerPartition.getLeaderEpoch)
+      assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 0,
+            initOffset = 0
+          ))
+        )
+
+      // The second call to removeFetcherForPartitions simulate the case
+      // where the fetcher write to the log before being shutdown.
+      Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+        Set(topicPartition))
+      ).thenAnswer { _ =>
+        replicaManager.getPartition(topicPartition) match {
+          case HostedPartition.Online(partition) =>
+            partition.appendRecordsToFollowerOrFutureReplica(
+              records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+                new SimpleRecord("first message".getBytes)),
+              isFuture = false
+            )
+
+          case _ =>
+        }
+
+        Map.empty[TopicPartition, PartitionFetchState]
+      }
+
+      // Apply changes that bumps the leader epoch.
+      followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false)
+      followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+      replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+      assertFalse(followerPartition.isLeader)
+      assertEquals(1, followerPartition.getLeaderEpoch)
+      assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+      // Verify that addFetcherForPartitions was called with the correct
+      // init offset.
+      Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+        .addFetcherForPartitions(
+          Map(topicPartition -> InitialFetchState(
+            leader = BrokerEndPoint(otherId, "localhost", 9093),
+            currentLeaderEpoch = 1,
+            initOffset = 1
+          ))
+        )
+    } finally {
+      replicaManager.shutdown()
+    }
+
+    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)

Review comment:
       nit: is it worthwhile moving this to an `@AfterEach`?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2220,7 +2210,22 @@ class ReplicaManager(val config: KafkaConfig,
     replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)

Review comment:
       Perhaps it's worth a comment here that stopping the fetchers is required so that we can initialize the fetch position correctly. It's a subtle point which we might miss again in the future.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
-    stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
-      case (topicPartition, e) =>
-        if (e.isInstanceOf[KafkaStorageException]) {
-          stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
-            "the local replica for the partition is in an offline log directory")
-        } else {
-          stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
-            s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e)
-        }
+    stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, e) =>
+      if (e.isInstanceOf[KafkaStorageException]) {
+        stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
+          "the local replica for the partition is in an offline log directory")

Review comment:
       nit: I was kind of wondering if it's worthwhile logging the exception message in here. We do construct the exception with different messages in different scenarios.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
-    stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
-      case (topicPartition, e) =>
-        if (e.isInstanceOf[KafkaStorageException]) {
-          stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
-            "the local replica for the partition is in an offline log directory")
-        } else {
-          stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " +
-            s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}", e)
-        }
+    stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue { (topicPartition, e) =>
+      if (e.isInstanceOf[KafkaStorageException]) {

Review comment:
       nit: could we do this with a `match` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org