You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/18 08:51:25 UTC

[GitHub] [kafka] dajac opened a new pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

dajac opened a new pull request #11225:
URL: https://github.com/apache/kafka/pull/11225


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r691703790



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>
             val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-            /*
-           * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
-           * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
-           * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
-           * we need to map this topic-partition to OfflinePartition instead.
-           */
+            // If there is offline log directory, a Partition object may have been created by getOrCreatePartition()

Review comment:
       nit: If there is _an_ offline log directory?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       This logic is a bit strange. It sounds like we need are trying to handle the case where we fail to create the log after we have already created the partition. Would it make more sense to handle this in `getOrCreatePartition` if an exception is raised?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>
             val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
-            /*
-           * If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
-           * before getOrCreateReplica() failed to create local replica due to KafkaStorageException.
-           * In this case ReplicaManager.allPartitions will map this topic-partition to an empty Partition object.
-           * we need to map this topic-partition to OfflinePartition instead.
-           */
+            // If there is offline log directory, a Partition object may have been created by getOrCreatePartition()
+            // before getOrCreateReplica() failed to create local replica due to KafkaStorageException.

Review comment:
       Hmm, `getOrCreateReplica` no longer exists. Maybe `createLogIfNotExists` is the replacement?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694960197



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)
+            stateChangeLogger.error(s"Unable to start fetching $tp " +
+              s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e)
+            replicaFetcherManager.addFailedPartition(tp)
+            error(s"Error while making broker the follower for partition $tp in dir " +

Review comment:
       No... I've put it because the other cases have it. I have removed all of them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694960458



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)

Review comment:
       Good point. I've added few unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r693979039



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       @hachikuji @showuon I have refactored this part. I think that we could remove this check and replace it by making the partition offline when an `KafkaStorageException` occurs. Does it make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11225:
URL: https://github.com/apache/kafka/pull/11225


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r693322503



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       There's actually another copy of codes doing the same thing in `ReplicaManager.scala : 2097`. I think we can make it a method to avoid code duplication.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694260532



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)

Review comment:
       Our test coverage seems a bit lacking. How much effort would it be to try and cover all these cases where the partition gets marked offline? As far as I can tell, there are no tests today which verify the partition gets marked offline in any of these cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r691041467



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       I wonder if we really have to re-iterate over all the partition states present in the request here. Intuitively, I would have thought that considering only the ones in `partitionStates` would be sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r691706079



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       This logic is a bit strange. It sounds like we are trying to handle the case where we fail to create the log after we have already created the partition. Would it make more sense to handle this in `getOrCreatePartition` if an exception is raised?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r691041467



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       I wonder if we really have to re-iterate over all the partition states present in the request here. Intuitively, I would have thought that considering only the ones in `partitionStates` would be sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r693973004



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1436,17 +1438,15 @@ class ReplicaManager(val config: KafkaConfig,
 
           leaderAndIsrRequest.partitionStates.forEach { partitionState =>

Review comment:
       `getOrCreatePartition` is not even called in this particular case but anyway `createLogIfNotExists` is called later so it would not work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694260532



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)

Review comment:
       Our test coverage seems a bit lacking. How much effort would it be to try and cover all these cases where the partition gets marked offline? As far as I can tell, there are no tests today which hit any verify the partition gets marked offline in any of these cases.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)
+            stateChangeLogger.error(s"Unable to start fetching $tp " +
+              s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e)
+            replicaFetcherManager.addFailedPartition(tp)
+            error(s"Error while making broker the follower for partition $tp in dir " +

Review comment:
       Do you think this log message adds anything beyond what is in the state change message above?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r696142107



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2207,15 +2191,27 @@ class ReplicaManager(val config: KafkaConfig,
                     InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
                 } else {
                   stateChangeLogger.info(
-                    s"Skipped the become-follower state change after marking its partition as " +
+                    "Skipped the become-follower state change after marking its partition as " +
                     s"follower for partition $tp with id ${info.topicId} and partition state $state."
                   )
                 }
             }
           }
           changedPartitions.add(partition)
         } catch {
-          case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " +
+          case e: KafkaStorageException =>
+            stateChangeLogger.error(s"Unable to start fetching $tp " +
+              s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e)
+            replicaFetcherManager.addFailedPartition(tp)
+            // If there is an offline log directory, a Partition object may have been created by
+            // `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due
+            // to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition
+            // to an empty Partition object. We need to map this topic-partition to OfflinePartition instead.
+            markPartitionOffline(tp)
+
+

Review comment:
       nit: extra newline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org