You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2016/11/30 18:40:41 UTC

kafka git commit: KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat and Sumant Tambe

Repository: kafka
Updated Branches:
  refs/heads/trunk 1503f7603 -> 497e669dd


KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gh...@gmail.com> and Sumant Tambe <su...@yahoo.com>

The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. jjkoshy , granders , avianey , you may be interested in this PR.

Author: Sumant Tambe <su...@yahoo.com>
Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>
Author: MayureshGharat <gh...@gmail.com>

Reviewers: Joel Koshy <jj...@gmail.com>

Closes #1664 from sutambe/async-delete-topic


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/497e669d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/497e669d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/497e669d

Branch: refs/heads/trunk
Commit: 497e669dd806fc984f5dceaede4a1f40f4e77c48
Parents: 1503f76
Author: Mayuresh Gharat <gh...@gmail.com>
Authored: Wed Nov 30 10:40:31 2016 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Nov 30 10:40:31 2016 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  5 +-
 .../main/scala/kafka/log/AbstractIndex.scala    | 23 ++---
 core/src/main/scala/kafka/log/Log.scala         | 31 ++++---
 core/src/main/scala/kafka/log/LogManager.scala  | 91 ++++++++++++++++----
 .../scala/kafka/server/ReplicaManager.scala     | 17 ++--
 5 files changed, 114 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4d3fb56..44d6a77 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -144,12 +144,13 @@ class Partition(val topic: String,
       assignedReplicaMap.clear()
       inSyncReplicas = Set.empty[Replica]
       leaderReplicaIdOpt = None
+      val topicPartition = TopicAndPartition(topic, partitionId)
       try {
-        logManager.deleteLog(TopicAndPartition(topic, partitionId))
+        logManager.asyncDelete(topicPartition)
         removePartitionMetrics()
       } catch {
         case e: IOException =>
-          fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
+          fatal(s"Error deleting the log for partition $topicPartition", e)
           Runtime.getRuntime().halt(1)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index d594f18..77ef0f7 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -33,11 +33,11 @@ import scala.math.ceil
 /**
  * The abstract index class which holds entry format agnostic methods.
  *
- * @param _file The index file
+ * @param file The index file
  * @param baseOffset the base offset of the segment that this index is corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
     extends Logging {
 
   protected def entrySize: Int
@@ -46,8 +46,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
 
   @volatile
   protected var mmap: MappedByteBuffer = {
-    val newlyCreated = _file.createNewFile()
-    val raf = new RandomAccessFile(_file, "rw")
+    val newlyCreated = file.createNewFile()
+    val raf = new RandomAccessFile(file, "rw")
     try {
       /* pre-allocate the file if necessary */
       if(newlyCreated) {
@@ -92,11 +92,6 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
   def entries: Int = _entries
 
   /**
-   * The index file
-   */
-  def file: File = _file
-
-  /**
    * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
    * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
    * loading segments from disk or truncating back to an old segment where a new log segment became active;
@@ -104,7 +99,7 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
    */
   def resize(newSize: Int) {
     inLock(lock) {
-      val raf = new RandomAccessFile(_file, "rw")
+      val raf = new RandomAccessFile(file, "rw")
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
       val position = mmap.position
 
@@ -128,8 +123,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
    * @throws IOException if rename fails
    */
   def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
-    finally _file = f
+    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+    finally file = f
   }
 
   /**
@@ -145,10 +140,10 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
    * Delete this index file
    */
   def delete(): Boolean = {
-    info(s"Deleting index ${_file.getAbsolutePath}")
+    info(s"Deleting index ${file.getAbsolutePath}")
     if(Os.isWindows)
       CoreUtils.swallow(forceUnmap(mmap))
-    _file.delete()
+    file.delete()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4c286d2..9e3dfac 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -83,7 +83,7 @@ case class LogAppendInfo(var firstOffset: Long,
  *
  */
 @threadsafe
-class Log(val dir: File,
+class Log(@volatile var dir: File,
           @volatile var config: LogConfig,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
@@ -222,7 +222,6 @@ class Log(val dir: File,
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
         }
-
         segments.put(start, segment)
       }
     }
@@ -263,12 +262,13 @@ class Log(val dir: File,
                                      initFileSize = this.initFileSize(),
                                      preallocate = config.preallocate))
     } else {
-      recoverLog()
-      // reset the index size of the currently active log segment to allow more entries
-      activeSegment.index.resize(config.maxIndexSize)
-      activeSegment.timeIndex.resize(config.maxIndexSize)
+      if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+        recoverLog()
+        // reset the index size of the currently active log segment to allow more entries
+        activeSegment.index.resize(config.maxIndexSize)
+        activeSegment.timeIndex.resize(config.maxIndexSize)
+      }
     }
-
   }
 
   private def updateLogEndOffset(messageOffset: Long) {
@@ -833,7 +833,6 @@ class Log(val dir: File,
    */
   private[log] def delete() {
     lock synchronized {
-      removeLogMetrics()
       logSegments.foreach(_.delete())
       segments.clear()
       Utils.delete(dir)
@@ -1046,6 +1045,9 @@ object Log {
   /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
   val CleanShutdownFile = ".kafka_cleanshutdown"
 
+  /** a directory that is scheduled to be deleted */
+  val DeleteDirSuffix = "-delete"
+
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.
@@ -1092,10 +1094,18 @@ object Log {
    * Parse the topic and partition out of the directory name of a log
    */
   def parseTopicPartitionName(dir: File): TopicAndPartition = {
-    val name: String = dir.getName
-    if (name == null || name.isEmpty || !name.contains('-')) {
+    val dirName = dir.getName
+    if (dirName == null || dirName.isEmpty || !dirName.contains('-')) {
       throwException(dir)
     }
+
+    val name: String =
+      if (dirName.endsWith(DeleteDirSuffix)) {
+        dirName.substring(0, dirName.indexOf('.'))
+      } else {
+        dirName
+      }
+
     val index = name.lastIndexOf('-')
     val topic: String = name.substring(0, index)
     val partition: String = name.substring(index + 1)
@@ -1105,6 +1115,7 @@ object Log {
     TopicAndPartition(topic, partition.toInt)
   }
 
+
   def throwException(dir: File) {
     throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
       "'" + dir.getName + "' is not in the form of topic-partition\n" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a5beb49..64b277a 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,13 +18,13 @@
 package kafka.log
 
 import java.io._
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 
 import kafka.utils._
 
 import scala.collection._
 import scala.collection.JavaConverters._
-import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
 import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
 
@@ -53,8 +53,10 @@ class LogManager(val logDirs: Array[File],
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
+
   private val logCreationOrDeletionLock = new Object
   private val logs = new Pool[TopicAndPartition, Log]()
+  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
 
   createAndValidateLogDirs(logDirs)
   private val dirLocks = lockLogDirs(logDirs)
@@ -150,12 +152,15 @@ class LogManager(val logDirs: Array[File],
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
 
           val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
-          val previous = this.logs.put(topicPartition, current)
-
-          if (previous != null) {
-            throw new IllegalArgumentException(
-              "Duplicate log directories found: %s, %s!".format(
-              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+          if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
+            this.logsToBeDeleted.add(current)
+          } else {
+            val previous = this.logs.put(topicPartition, current)
+            if (previous != null) {
+              throw new IllegalArgumentException(
+                "Duplicate log directories found: %s, %s!".format(
+                  current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+            }
           }
         }
       }
@@ -204,6 +209,11 @@ class LogManager(val logDirs: Array[File],
                          delay = InitialTaskDelayMs,
                          period = flushCheckpointMs,
                          TimeUnit.MILLISECONDS)
+      scheduler.schedule("kafka-delete-logs",
+                         deleteLogs,
+                         delay = InitialTaskDelayMs,
+                         period = defaultConfig.fileDeleteDelayMs,
+                         TimeUnit.MILLISECONDS)
     }
     if(cleanerConfig.enableCleaner)
       cleaner.startup()
@@ -376,24 +386,69 @@ class LogManager(val logDirs: Array[File],
   }
 
   /**
-   *  Delete a log.
+   *  Delete logs marked for deletion.
    */
-  def deleteLog(topicAndPartition: TopicAndPartition) {
-    var removedLog: Log = null
-    logCreationOrDeletionLock synchronized {
-      removedLog = logs.remove(topicAndPartition)
+  private def deleteLogs(): Unit = {
+    try {
+      var failed = 0
+      while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
+        val removedLog = logsToBeDeleted.take()
+        if (removedLog != null) {
+          try {
+            removedLog.delete()
+            info(s"Deleted log for partition ${removedLog.topicAndPartition} in ${removedLog.dir.getAbsolutePath}.")
+          } catch {
+            case e: Throwable =>
+              error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
+              failed = failed + 1
+              logsToBeDeleted.put(removedLog)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => 
+        error(s"Exception in kafka-delete-logs thread.", e)
     }
+}
+
+  /**
+    * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and 
+    * add it in the queue for deletion. 
+    * @param topicAndPartition TopicPartition that needs to be deleted
+    */
+  def asyncDelete(topicAndPartition: TopicAndPartition) = {
+    val removedLog: Log = logCreationOrDeletionLock synchronized {
+                            logs.remove(topicAndPartition)
+                          }
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
       if (cleaner != null) {
         cleaner.abortCleaning(topicAndPartition)
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
-      removedLog.delete()
-      info("Deleted log for partition [%s,%d] in %s."
-           .format(topicAndPartition.topic,
-                   topicAndPartition.partition,
-                   removedLog.dir.getAbsolutePath))
+      // renaming the directory to topic-partition.uniqueId-delete
+      val dirName = new StringBuilder(removedLog.name)
+                        .append(".")
+                        .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
+                        .append(Log.DeleteDirSuffix)
+                        .toString()
+      removedLog.close()
+      val renamedDir = new File(removedLog.dir.getParent, dirName)
+      val renameSuccessful = removedLog.dir.renameTo(renamedDir)
+      if (renameSuccessful) {
+        removedLog.dir = renamedDir
+        // change the file pointers for log and index file
+        for (logSegment <- removedLog.logSegments) {
+          logSegment.log.file = new File(renamedDir, logSegment.log.file.getName)
+          logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
+        }
+
+        logsToBeDeleted.add(removedLog)
+        removedLog.removeLogMetrics()
+        info(s"Log for partition ${removedLog.topicAndPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+      } else {
+        throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index febe8ad..d2ec200 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -219,10 +219,11 @@ class ReplicaManager(val config: KafkaConfig,
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
   }
 
-  def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
-    stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
-      deletePartition.toString, topic, partitionId))
+  def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short  = {
+    stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
     val errorCode = Errors.NONE.code
+    val topic = topicPartition.topic
+    val partitionId = topicPartition.partition
     getPartition(topic, partitionId) match {
       case Some(_) =>
         if (deletePartition) {
@@ -241,14 +242,12 @@ class ReplicaManager(val config: KafkaConfig,
           val topicAndPartition = TopicAndPartition(topic, partitionId)
 
           if(logManager.getLog(topicAndPartition).isDefined) {
-              logManager.deleteLog(topicAndPartition)
+              logManager.asyncDelete(topicAndPartition)
           }
         }
-        stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
-          .format(localBrokerId, deletePartition, topic, partitionId))
+        stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
     }
-    stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
-      .format(localBrokerId, deletePartition, topic, partitionId))
+    stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
     errorCode
   }
 
@@ -265,7 +264,7 @@ class ReplicaManager(val config: KafkaConfig,
         // First stop fetchers for all partitions, then stop the corresponding replicas
         replicaFetcherManager.removeFetcherForPartitions(partitions)
         for (topicPartition <- partitions){
-          val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
+          val errorCode = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
           responseMap.put(topicPartition, errorCode)
         }
         (responseMap, Errors.NONE.code)