You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/04 05:48:26 UTC

[GitHub] [kafka] lbradstreet opened a new pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

lbradstreet opened a new pull request #11376:
URL: https://github.com/apache/kafka/pull/11376


   In some cases a broker may be lost during a topic deletion and before
   its replica has moved to OfflineReplica state. When the broker comes
   back up the controller will send it a LeaderAndIsrRequest containing the
   partition even though it is already in the deleting state in the controller.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r800202601



##########
File path: core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
##########
@@ -394,6 +394,23 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testInvalidOnlineReplicaTransitionWithQueuedDeletion(): Unit = {
+    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
+    controllerContext.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(Seq(brokerId)))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+    controllerContext.queueTopicDeletion(Set(replica.topic))
+    controllerContext.putReplicaState(replica, OnlineReplica)
+    // we don't expect addLeaderAndIsrRequestForBrokers to be called for this partition
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))

Review comment:
       Makes sense, well noticed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#issuecomment-977323145


   @dajac I forgot about this change. Do you have some time to review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r830724959



##########
File path: core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
##########
@@ -394,6 +394,22 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testInvalidOnlineReplicaTransitionWithQueuedDeletion(): Unit = {
+    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
+    controllerContext.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(Seq(brokerId)))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())

Review comment:
       We've replaced EasyMock with Mockito in the latest trunk. Could you rebase to the latest trunk and update the test accordingly? Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r755680018



##########
File path: core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
##########
@@ -394,6 +394,23 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testInvalidOnlineReplicaTransitionWithQueuedDeletion(): Unit = {
+    controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
+    controllerContext.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(Seq(brokerId)))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+    controllerContext.queueTopicDeletion(Set(replica.topic))
+    controllerContext.putReplicaState(replica, OnlineReplica)
+    // we don't expect addLeaderAndIsrRequestForBrokers to be called for this partition
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))

Review comment:
       I don't think the test here is correct. You put the ReplicaState to `OnlineReplica`, and then verify the replicaState is `OnlineReplica`. I think it makes more sense if you can remove the line `controllerContext.putReplicaState(replica, OnlineReplica)`. What do you think?
   
   PS. I removed the linei and ran it in my local env, and it still pass. FYI

##########
File path: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
##########
@@ -204,13 +204,17 @@ class ZkReplicaStateMachine(config: KafkaConfig,
                 controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
               }
             case _ =>
-              controllerContext.partitionLeadershipInfo(partition) match {
-                case Some(leaderIsrAndControllerEpoch) =>
-                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-                    replica.topicPartition,
-                    leaderIsrAndControllerEpoch,
-                    controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
-                case None =>
+              if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
+                controllerContext.partitionLeadershipInfo(partition) match {
+                  case Some(leaderIsrAndControllerEpoch) =>
+                    controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+                      replica.topicPartition,
+                      leaderIsrAndControllerEpoch,
+                      controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
+                  case None =>
+                }
+              } else {
+                info(s"OnlineReplica is in deleting state $partition. Not adding it to the LeaderAndIsrRequest")

Review comment:
       The log here is not clear enough. How about this:
   
   "OnlineReplica is in deleting state $partition. Not adding it to the LeaderAndIsrRequest" ->
   "Replica $replicaId for partition $partition is in deleting state. Not adding it to the LeaderAndIsrRequest."
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#issuecomment-1030532699


   Are you planning to address the comments @lbradstreet ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#issuecomment-1030866772


   > Are you planning to address the comments @lbradstreet ?
   
   @ijuma thanks for the reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r800202592



##########
File path: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
##########
@@ -204,13 +204,17 @@ class ZkReplicaStateMachine(config: KafkaConfig,
                 controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
               }
             case _ =>
-              controllerContext.partitionLeadershipInfo(partition) match {
-                case Some(leaderIsrAndControllerEpoch) =>
-                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-                    replica.topicPartition,
-                    leaderIsrAndControllerEpoch,
-                    controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
-                case None =>
+              if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
+                controllerContext.partitionLeadershipInfo(partition) match {
+                  case Some(leaderIsrAndControllerEpoch) =>
+                    controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+                      replica.topicPartition,
+                      leaderIsrAndControllerEpoch,
+                      controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
+                  case None =>
+                }
+              } else {
+                info(s"OnlineReplica is in deleting state $partition. Not adding it to the LeaderAndIsrRequest")

Review comment:
       Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org