You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Pengwei (JIRA)" <ji...@apache.org> on 2017/07/04 08:05:00 UTC

[jira] [Commented] (KAFKA-5553) Delete topic failed to change from OnlineReplica to ReplicaDeletionStarted if ISR not created

    [ https://issues.apache.org/jira/browse/KAFKA-5553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073279#comment-16073279 ] 

Pengwei commented on KAFKA-5553:
--------------------------------

Digging in the log, I found the issue is as follow:
My env is only a one broker kafka and create a topic for one partition and one replica
1. Due to the network issue,  controller miss one time of topic create issue: in kafkaController.onNewTopicCreation

2. User start to delete the topic, then the topic's /brokers/topics/partitions directory is not create yet.

3. controller re-relect and found the topic is being deleted,  so in the onBrokerStartup:

{code:java}
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
    // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
    // to see if these brokers can become leaders for some/all of those
    partitionStateMachine.triggerOnlinePartitionStateChange()
{code}

replica for this partition is OnlineReplica, but when trigger the partition to online, because the topic is being deleted, it will not to create the ISR directory

4. After that the delete topic thread will try to change this replica to offline and try to delete:

{code:java}
      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
        new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
{code}

[2016-07-11 20:17:52,962] INFO [Replica state machine on controller 1328]: Invoking state change to OfflineReplica for replicas [Topic=websocket_test_topic,Partition=0,Replica=1328] (kafka.controller.ReplicaStateMachine)

but in the ReplicaStateMachine.handleStateChange for the target staute is OfflineReplica, if the controllerContext.partitionLeadershipInfo does not contain this topic partition, it will not change the replica's state to OfflineReplica.  So the state is still OnlineReplica

Then will changing to ReplicaDeletionStarted, will throw the above error and can not recover 

> Delete topic failed to change from OnlineReplica to ReplicaDeletionStarted  if ISR not created
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5553
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5553
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.9.0.1, 0.10.2.0, 0.11.0.0
>            Reporter: Pengwei
>
> We found the error log as follow and the topic can not remove for a long time:
> [2016-07-11 20:17:52,965] ERROR Controller 1328 epoch 315 initiated state change of replica 1328 for partition [websocket_test_topic,0] from OnlineReplica to ReplicaDeletionStarted failed (state.change.logger)
> java.lang.AssertionError: assertion failed: Replica [Topic=websocket_test_topic,Partition=0,Replica=1328] should be in the OfflineReplica states before moving to ReplicaDeletionStarted state. Instead it is in OnlineReplica state
> 	at scala.Predef$.assert(Predef.scala:165)
> 	at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309)
> 	at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190)
> 	at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> 	at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114)
> 	at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322)
> 	at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114)
> 	at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344)
> 	at kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334)
> 	at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> 	at kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334)
> 	at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367)
> 	at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313)
> 	at kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312)
> 	at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> 	at kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403)
> 	at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> 	at kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397)
> 	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)