You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/24 03:45:05 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

showuon commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r755680018



##########
File path: core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
##########
@@ -394,6 +394,23 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testInvalidOnlineReplicaTransitionWithQueuedDeletion(): Unit = {
+    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
+    controllerContext.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(Seq(brokerId)))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+    controllerContext.queueTopicDeletion(Set(replica.topic))
+    controllerContext.putReplicaState(replica, OnlineReplica)
+    // we don't expect addLeaderAndIsrRequestForBrokers to be called for this partition
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))

Review comment:
       I don't think the test here is correct. You put the ReplicaState to `OnlineReplica`, and then verify the replicaState is `OnlineReplica`. I think it makes more sense if you can remove the line `controllerContext.putReplicaState(replica, OnlineReplica)`. What do you think?
   
   PS. I removed the linei and ran it in my local env, and it still pass. FYI

##########
File path: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
##########
@@ -204,13 +204,17 @@ class ZkReplicaStateMachine(config: KafkaConfig,
                 controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
               }
             case _ =>
-              controllerContext.partitionLeadershipInfo(partition) match {
-                case Some(leaderIsrAndControllerEpoch) =>
-                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-                    replica.topicPartition,
-                    leaderIsrAndControllerEpoch,
-                    controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
-                case None =>
+              if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
+                controllerContext.partitionLeadershipInfo(partition) match {
+                  case Some(leaderIsrAndControllerEpoch) =>
+                    controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+                      replica.topicPartition,
+                      leaderIsrAndControllerEpoch,
+                      controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
+                  case None =>
+                }
+              } else {
+                info(s"OnlineReplica is in deleting state $partition. Not adding it to the LeaderAndIsrRequest")

Review comment:
       The log here is not clear enough. How about this:
   
   "OnlineReplica is in deleting state $partition. Not adding it to the LeaderAndIsrRequest" ->
   "Replica $replicaId for partition $partition is in deleting state. Not adding it to the LeaderAndIsrRequest."
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org