You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/16 21:58:08 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

hachikuji commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689886267



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String,
     val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
-    fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+    // Partitions may have been removed from the fetcher while the thread was waiting for fetch
+    // response. Filter out removed partitions while holding `partitionMapLock` to ensure that we
+    // don't update state for any partition that may have already been migrated to another thread.
+    fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }.forKeyValue { (tp, leaderEpochOffset) =>

Review comment:
       Related to David's comment, but I think the `filter` here still builds a collection. Alternatively, we could move the check into `forKeyValue`. Perhaps it would even be useful having a `trace` level log message in the `else` case when we ignore the result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org