You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2016/11/30 18:40:41 UTC
kafka git commit: KAFKA-1911;
Async delete topic - contributed by Mayuresh Gharat
and Sumant Tambe
Repository: kafka
Updated Branches:
refs/heads/trunk 1503f7603 -> 497e669dd
KAFKA-1911; Async delete topic - contributed by Mayuresh Gharat <gh...@gmail.com> and Sumant Tambe <su...@yahoo.com>
The last patch submitted by MayureshGharat (back in Dec 15) has been rebased to the latest trunk. I took care of a couple of test failures (MetricsTest) along the way. jjkoshy , granders , avianey , you may be interested in this PR.
Author: Sumant Tambe <su...@yahoo.com>
Author: Mayuresh Gharat <mg...@mgharat-ld1.linkedin.biz>
Author: MayureshGharat <gh...@gmail.com>
Reviewers: Joel Koshy <jj...@gmail.com>
Closes #1664 from sutambe/async-delete-topic
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/497e669d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/497e669d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/497e669d
Branch: refs/heads/trunk
Commit: 497e669dd806fc984f5dceaede4a1f40f4e77c48
Parents: 1503f76
Author: Mayuresh Gharat <gh...@gmail.com>
Authored: Wed Nov 30 10:40:31 2016 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Nov 30 10:40:31 2016 -0800
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 5 +-
.../main/scala/kafka/log/AbstractIndex.scala | 23 ++---
core/src/main/scala/kafka/log/Log.scala | 31 ++++---
core/src/main/scala/kafka/log/LogManager.scala | 91 ++++++++++++++++----
.../scala/kafka/server/ReplicaManager.scala | 17 ++--
5 files changed, 114 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4d3fb56..44d6a77 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -144,12 +144,13 @@ class Partition(val topic: String,
assignedReplicaMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
+ val topicPartition = TopicAndPartition(topic, partitionId)
try {
- logManager.deleteLog(TopicAndPartition(topic, partitionId))
+ logManager.asyncDelete(topicPartition)
removePartitionMetrics()
} catch {
case e: IOException =>
- fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
+ fatal(s"Error deleting the log for partition $topicPartition", e)
Runtime.getRuntime().halt(1)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index d594f18..77ef0f7 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -33,11 +33,11 @@ import scala.math.ceil
/**
* The abstract index class which holds entry format agnostic methods.
*
- * @param _file The index file
+ * @param file The index file
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
-abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
extends Logging {
protected def entrySize: Int
@@ -46,8 +46,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
@volatile
protected var mmap: MappedByteBuffer = {
- val newlyCreated = _file.createNewFile()
- val raf = new RandomAccessFile(_file, "rw")
+ val newlyCreated = file.createNewFile()
+ val raf = new RandomAccessFile(file, "rw")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
@@ -92,11 +92,6 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
def entries: Int = _entries
/**
- * The index file
- */
- def file: File = _file
-
- /**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
@@ -104,7 +99,7 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
*/
def resize(newSize: Int) {
inLock(lock) {
- val raf = new RandomAccessFile(_file, "rw")
+ val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
val position = mmap.position
@@ -128,8 +123,8 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
* @throws IOException if rename fails
*/
def renameTo(f: File) {
- try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
- finally _file = f
+ try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+ finally file = f
}
/**
@@ -145,10 +140,10 @@ abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val
* Delete this index file
*/
def delete(): Boolean = {
- info(s"Deleting index ${_file.getAbsolutePath}")
+ info(s"Deleting index ${file.getAbsolutePath}")
if(Os.isWindows)
CoreUtils.swallow(forceUnmap(mmap))
- _file.delete()
+ file.delete()
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 4c286d2..9e3dfac 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -83,7 +83,7 @@ case class LogAppendInfo(var firstOffset: Long,
*
*/
@threadsafe
-class Log(val dir: File,
+class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
@@ -222,7 +222,6 @@ class Log(val dir: File,
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
-
segments.put(start, segment)
}
}
@@ -263,12 +262,13 @@ class Log(val dir: File,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
} else {
- recoverLog()
- // reset the index size of the currently active log segment to allow more entries
- activeSegment.index.resize(config.maxIndexSize)
- activeSegment.timeIndex.resize(config.maxIndexSize)
+ if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+ recoverLog()
+ // reset the index size of the currently active log segment to allow more entries
+ activeSegment.index.resize(config.maxIndexSize)
+ activeSegment.timeIndex.resize(config.maxIndexSize)
+ }
}
-
}
private def updateLogEndOffset(messageOffset: Long) {
@@ -833,7 +833,6 @@ class Log(val dir: File,
*/
private[log] def delete() {
lock synchronized {
- removeLogMetrics()
logSegments.foreach(_.delete())
segments.clear()
Utils.delete(dir)
@@ -1046,6 +1045,9 @@ object Log {
/** TODO: Get rid of CleanShutdownFile in 0.8.2 */
val CleanShutdownFile = ".kafka_cleanshutdown"
+ /** a directory that is scheduled to be deleted */
+ val DeleteDirSuffix = "-delete"
+
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
@@ -1092,10 +1094,18 @@ object Log {
* Parse the topic and partition out of the directory name of a log
*/
def parseTopicPartitionName(dir: File): TopicAndPartition = {
- val name: String = dir.getName
- if (name == null || name.isEmpty || !name.contains('-')) {
+ val dirName = dir.getName
+ if (dirName == null || dirName.isEmpty || !dirName.contains('-')) {
throwException(dir)
}
+
+ val name: String =
+ if (dirName.endsWith(DeleteDirSuffix)) {
+ dirName.substring(0, dirName.indexOf('.'))
+ } else {
+ dirName
+ }
+
val index = name.lastIndexOf('-')
val topic: String = name.substring(0, index)
val partition: String = name.substring(index + 1)
@@ -1105,6 +1115,7 @@ object Log {
TopicAndPartition(topic, partition.toInt)
}
+
def throwException(dir: File) {
throw new KafkaException("Found directory " + dir.getCanonicalPath + ", " +
"'" + dir.getName + "' is not in the form of topic-partition\n" +
http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a5beb49..64b277a 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,13 +18,13 @@
package kafka.log
import java.io._
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import kafka.utils._
import scala.collection._
import scala.collection.JavaConverters._
-import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.common.{KafkaStorageException, KafkaException, TopicAndPartition}
import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
@@ -53,8 +53,10 @@ class LogManager(val logDirs: Array[File],
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
+
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
+ private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
@@ -150,12 +152,15 @@ class LogManager(val logDirs: Array[File],
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
- val previous = this.logs.put(topicPartition, current)
-
- if (previous != null) {
- throw new IllegalArgumentException(
- "Duplicate log directories found: %s, %s!".format(
- current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
+ this.logsToBeDeleted.add(current)
+ } else {
+ val previous = this.logs.put(topicPartition, current)
+ if (previous != null) {
+ throw new IllegalArgumentException(
+ "Duplicate log directories found: %s, %s!".format(
+ current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ }
}
}
}
@@ -204,6 +209,11 @@ class LogManager(val logDirs: Array[File],
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
+ scheduler.schedule("kafka-delete-logs",
+ deleteLogs,
+ delay = InitialTaskDelayMs,
+ period = defaultConfig.fileDeleteDelayMs,
+ TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
@@ -376,24 +386,69 @@ class LogManager(val logDirs: Array[File],
}
/**
- * Delete a log.
+ * Delete logs marked for deletion.
*/
- def deleteLog(topicAndPartition: TopicAndPartition) {
- var removedLog: Log = null
- logCreationOrDeletionLock synchronized {
- removedLog = logs.remove(topicAndPartition)
+ private def deleteLogs(): Unit = {
+ try {
+ var failed = 0
+ while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
+ val removedLog = logsToBeDeleted.take()
+ if (removedLog != null) {
+ try {
+ removedLog.delete()
+ info(s"Deleted log for partition ${removedLog.topicAndPartition} in ${removedLog.dir.getAbsolutePath}.")
+ } catch {
+ case e: Throwable =>
+ error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
+ failed = failed + 1
+ logsToBeDeleted.put(removedLog)
+ }
+ }
+ }
+ } catch {
+ case e: Throwable =>
+ error(s"Exception in kafka-delete-logs thread.", e)
}
+}
+
+ /**
+ * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
+ * add it in the queue for deletion.
+ * @param topicAndPartition TopicPartition that needs to be deleted
+ */
+ def asyncDelete(topicAndPartition: TopicAndPartition) = {
+ val removedLog: Log = logCreationOrDeletionLock synchronized {
+ logs.remove(topicAndPartition)
+ }
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null) {
cleaner.abortCleaning(topicAndPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
- removedLog.delete()
- info("Deleted log for partition [%s,%d] in %s."
- .format(topicAndPartition.topic,
- topicAndPartition.partition,
- removedLog.dir.getAbsolutePath))
+ // renaming the directory to topic-partition.uniqueId-delete
+ val dirName = new StringBuilder(removedLog.name)
+ .append(".")
+ .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
+ .append(Log.DeleteDirSuffix)
+ .toString()
+ removedLog.close()
+ val renamedDir = new File(removedLog.dir.getParent, dirName)
+ val renameSuccessful = removedLog.dir.renameTo(renamedDir)
+ if (renameSuccessful) {
+ removedLog.dir = renamedDir
+ // change the file pointers for log and index file
+ for (logSegment <- removedLog.logSegments) {
+ logSegment.log.file = new File(renamedDir, logSegment.log.file.getName)
+ logSegment.index.file = new File(renamedDir, logSegment.index.file.getName)
+ }
+
+ logsToBeDeleted.add(removedLog)
+ removedLog.removeLogMetrics()
+ info(s"Log for partition ${removedLog.topicAndPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+ } else {
+ throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/497e669d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index febe8ad..d2ec200 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -219,10 +219,11 @@ class ReplicaManager(val config: KafkaConfig,
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}
- def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
- stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
- deletePartition.toString, topic, partitionId))
+ def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short = {
+ stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
val errorCode = Errors.NONE.code
+ val topic = topicPartition.topic
+ val partitionId = topicPartition.partition
getPartition(topic, partitionId) match {
case Some(_) =>
if (deletePartition) {
@@ -241,14 +242,12 @@ class ReplicaManager(val config: KafkaConfig,
val topicAndPartition = TopicAndPartition(topic, partitionId)
if(logManager.getLog(topicAndPartition).isDefined) {
- logManager.deleteLog(topicAndPartition)
+ logManager.asyncDelete(topicAndPartition)
}
}
- stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
- .format(localBrokerId, deletePartition, topic, partitionId))
+ stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
}
- stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
- .format(localBrokerId, deletePartition, topic, partitionId))
+ stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
errorCode
}
@@ -265,7 +264,7 @@ class ReplicaManager(val config: KafkaConfig,
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions){
- val errorCode = stopReplica(topicPartition.topic, topicPartition.partition, stopReplicaRequest.deletePartitions)
+ val errorCode = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
responseMap.put(topicPartition, errorCode)
}
(responseMap, Errors.NONE.code)