You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/05/17 23:56:15 UTC

[kafka] branch trunk updated: MINOR: Remove spammy log message during topic deletion

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d6057a9  MINOR: Remove spammy log message during topic deletion
d6057a9 is described below

commit d6057a9fe44b5e9521dd679b0dc1e6ccfec866c2
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 17 16:55:55 2019 -0700

    MINOR: Remove spammy log message during topic deletion
    
    Deletion of a large number of topics can cause a ton of log spam. In a test case on 2.2, deletion of 50 topics with 100 partitions each caused about 158 Mb of data in the controller log. With the improvements to batch StopReplica and the patch here, we reduce that to about 1.5 Mb.
    
    Kudos to gwenshap for spotting these spammy messages.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6738 from hachikuji/remove-verbose-topic-deletion-log-message
---
 .../scala/kafka/controller/KafkaController.scala     |  6 ++++--
 .../kafka/controller/TopicDeletionManager.scala      | 20 ++++++--------------
 2 files changed, 10 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0234c3b..2034c74 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -609,7 +609,8 @@ class KafkaController(val config: KafkaConfig,
               // first register ISR change listener
               reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
               // mark topic ineligible for deletion for the partitions being reassigned
-              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
+                reason = "topic reassignment in progress")
               onPartitionReassignment(tp, reassignedPartitionContext)
             } catch {
               case e: ControllerMovedException =>
@@ -1385,7 +1386,8 @@ class KafkaController(val config: KafkaConfig,
           val partitionReassignmentInProgress =
             controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
           if (partitionReassignmentInProgress)
-            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
+              reason = "topic reassignment in progress")
         }
         // add topic to deletion list
         topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 25d9faf..b15b795 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -154,7 +154,7 @@ class TopicDeletionManager(config: KafkaConfig,
         val topics = replicasThatFailedToDelete.map(_.topic)
         debug(s"Deletion failed for replicas ${replicasThatFailedToDelete.mkString(",")}. Halting deletion for topics $topics")
         replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
-        markTopicIneligibleForDeletion(topics)
+        markTopicIneligibleForDeletion(topics, reason = "replica deletion failure")
         resumeDeletions()
       }
     }
@@ -166,12 +166,12 @@ class TopicDeletionManager(config: KafkaConfig,
    * 2. partition reassignment in progress for some partitions of the topic
    * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
    */
-  def markTopicIneligibleForDeletion(topics: Set[String]): Unit = {
+  def markTopicIneligibleForDeletion(topics: Set[String], reason: => String): Unit = {
     if (isDeleteTopicEnabled) {
       val newTopicsToHaltDeletion = controllerContext.topicsToBeDeleted & topics
       controllerContext.topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
       if (newTopicsToHaltDeletion.nonEmpty)
-        info(s"Halted deletion of topics ${newTopicsToHaltDeletion.mkString(",")}")
+        info(s"Halted deletion of topics ${newTopicsToHaltDeletion.mkString(",")} due to $reason")
     }
   }
 
@@ -231,7 +231,7 @@ class TopicDeletionManager(config: KafkaConfig,
   private def retryDeletionForIneligibleReplicas(topic: String): Unit = {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
     val failedReplicas = controllerContext.replicasInState(topic, ReplicaDeletionIneligible)
-    info(s"Retrying delete topic for topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
+    info(s"Retrying deletion of topic $topic since replicas ${failedReplicas.mkString(",")} were not successfully deleted")
     replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
   }
 
@@ -305,7 +305,7 @@ class TopicDeletionManager(config: KafkaConfig,
       replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted)
       if (deadReplicasForTopic.nonEmpty) {
         debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found for topic $topic")
-        markTopicIneligibleForDeletion(Set(topic))
+        markTopicIneligibleForDeletion(Set(topic), reason = "offline replicas")
       }
     }
   }
@@ -338,13 +338,7 @@ class TopicDeletionManager(config: KafkaConfig,
         // clear up all state for this topic from controller cache and zookeeper
         completeDeleteTopic(topic)
         info(s"Deletion of topic $topic successfully completed")
-      } else if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
-        // ignore since topic deletion is in progress
-        val replicasInDeletionStartedState = controllerContext.replicasInState(topic, ReplicaDeletionStarted)
-        val replicaIds = replicasInDeletionStartedState.map(_.replica)
-        val partitions = replicasInDeletionStartedState.map(_.topicPartition)
-        info(s"Deletion for replicas ${replicaIds.mkString(",")} for partition ${partitions.mkString(",")} of topic $topic in progress")
-      } else {
+      } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
         // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
         // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
         // or there is at least one failed replica (which means topic deletion should be retried).
@@ -358,8 +352,6 @@ class TopicDeletionManager(config: KafkaConfig,
         info(s"Deletion of topic $topic (re)started")
         // topic deletion will be kicked off
         onTopicDeletion(Set(topic))
-      } else if (isTopicIneligibleForDeletion(topic)) {
-        info(s"Not retrying deletion of topic $topic at this time since it is marked ineligible for deletion")
       }
     }
   }