You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/03 07:56:47 UTC

[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r644571665



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
     } else {
       val partitionStates = stopReplicaRequest.partitionStates().asScala
-      val (result, error) = replicaManager.stopReplicas(
-        request.context.correlationId,
-        stopReplicaRequest.controllerId,
-        stopReplicaRequest.controllerEpoch,
-        stopReplicaRequest.brokerEpoch,
-        partitionStates)
-      // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
-      // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
-      result.forKeyValue { (topicPartition, error) =>
-        if (error == Errors.NONE) {
-          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-              && partitionStates(topicPartition).deletePartition) {
-            groupCoordinator.onResignation(topicPartition.partition)
-          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
-                     && partitionStates(topicPartition).deletePartition) {
+      def onStopReplicas(error: Errors, partitions: Map[TopicPartition, Errors]): Unit = {
+        // Clear the coordinator caches in case we were the leader. In the case of a reassignment, we
+        // cannot rely on the LeaderAndIsr API for this since it is only sent to active replicas.
+        partitions.forKeyValue { (topicPartition, partitionError) =>
+          if (partitionError == Errors.NONE) {
             val partitionState = partitionStates(topicPartition)
             val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-                Some(partitionState.leaderEpoch)
+              Some(partitionState.leaderEpoch)
             else
               None
-            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+              && partitionState.deletePartition) {
+              groupCoordinator.onResignation(topicPartition.partition, leaderEpoch)
+            } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+              && partitionState.deletePartition) {
+              txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
+            }
           }
         }
       }
+      val (result, error) = replicaManager.stopReplicas(
+        request.context.correlationId,
+        stopReplicaRequest.controllerId,
+        stopReplicaRequest.controllerEpoch,
+        stopReplicaRequest.brokerEpoch,
+        partitionStates,
+        onStopReplicas)

Review comment:
       @hachikuji as @guozhangwang pointed out, the scheduler really _is_ FIFO (it uses a sequence number internally to guarantee order is maintained), so assuming his theory about the racing i/o threads is correct (and I think it is, but I've never observed this problem) then his solution of holding the lock for `handleStopReplicaRequest` would work. 
   
   The current version of the PR doesn't make assumptions about how any reordering can happen (i.e. whether caused by the inconsistent locking or anything else). So I don't think you're missing anything, you've just solved the problem differently.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org