You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/01/08 07:32:22 UTC

git commit: kafka-1074; Reassign partitions should delete the old replicas from disk; patched by Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang

Updated Branches:
  refs/heads/trunk 15f3c8417 -> f63e3f730


kafka-1074; Reassign partitions should delete the old replicas from disk; patched by Jun Rao; reviewed by Jay Kreps, Neha Narkhede and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f63e3f73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f63e3f73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f63e3f73

Branch: refs/heads/trunk
Commit: f63e3f730673f60ca179030cf0d64380689c2a40
Parents: 15f3c84
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Jan 7 22:36:00 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 7 22:36:00 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  19 +-
 .../common/LogCleaningAbortedException.scala    |  24 +++
 .../common/OptimisticLockFailureException.scala |  23 --
 .../kafka/common/ThreadShutdownException.scala  |  24 +++
 .../kafka/controller/KafkaController.scala      |   2 +-
 core/src/main/scala/kafka/log/Log.scala         |  37 ++--
 core/src/main/scala/kafka/log/LogCleaner.scala  | 208 +++++++------------
 .../scala/kafka/log/LogCleanerManager.scala     | 188 +++++++++++++++++
 core/src/main/scala/kafka/log/LogManager.scala  |  76 ++++---
 .../kafka/server/ReplicaFetcherThread.scala     |   7 +-
 .../scala/kafka/server/ReplicaManager.scala     |  18 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  15 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  39 ++--
 13 files changed, 452 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5c9307d..1087a2e 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -25,10 +25,11 @@ import kafka.log.LogConfig
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
-import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import kafka.controller.KafkaController
 import org.apache.log4j.Logger
 import kafka.message.ByteBufferMessageSet
 import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import java.io.IOException
 
 
 /**
@@ -135,6 +136,22 @@ class Partition(val topic: String,
     assignedReplicaMap.remove(replicaId)
   }
 
+  def delete() {
+    // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted
+    leaderIsrUpdateLock synchronized {
+      assignedReplicaMap.clear()
+      inSyncReplicas = Set.empty[Replica]
+      leaderReplicaIdOpt = None
+      try {
+        logManager.deleteLog(TopicAndPartition(topic, partitionId))
+      } catch {
+        case e: IOException =>
+          fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
+          Runtime.getRuntime().halt(1)
+      }
+    }
+  }
+
   def getLeaderEpoch(): Int = {
     leaderIsrUpdateLock synchronized {
       return this.leaderEpoch

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
new file mode 100644
index 0000000..5ea6632
--- /dev/null
+++ b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a log cleaning task is requested to be aborted.
+ */
+class LogCleaningAbortedException() extends RuntimeException() {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
deleted file mode 100644
index 0e69110..0000000
--- a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when an optimistic locking attempt receives concurrent modifications
- */
-class OptimisticLockFailureException(message: String) extends RuntimeException(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/common/ThreadShutdownException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala b/core/src/main/scala/kafka/common/ThreadShutdownException.scala
new file mode 100644
index 0000000..6554a5e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ThreadShutdownException.scala
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * An exception that indicates a thread is being shut down normally.
+ */
+class ThreadShutdownException() extends RuntimeException {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 6215cb8..03ef9cf 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -413,7 +413,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part
    *    of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to
    *    the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in
-   *    RAR - AR. Currently, NonExistentReplica state change is a NO-OP
+   *    RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR.
    * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR.
    * 7. Remove partition from the /admin/reassign_partitions path
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index beda421..b3ab522 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -63,12 +63,11 @@ class Log(val dir: File,
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
   loadSegments()
   
-  /* The number of times the log has been truncated */
-  private val truncates = new AtomicInteger(0)
-    
   /* Calculate the offset of the next message */
   private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
 
+  val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
+
   info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
 
   newGauge(name + "-" + "NumLogSegments",
@@ -202,11 +201,6 @@ class Log(val dir: File,
   def numberOfSegments: Int = segments.size
   
   /**
-   * The number of truncates that have occurred since the log was opened.
-   */
-  def numberOfTruncates: Int = truncates.get
-
-  /**
    * Close this log
    */
   def close() {
@@ -524,16 +518,19 @@ class Log(val dir: File,
   /**
    * Completely delete this log directory and all contents from the file system with no delay
    */
-  def delete(): Unit = {
-    logSegments.foreach(_.delete())
-    Utils.rm(dir)
+  private[log] def delete() {
+    lock synchronized {
+      logSegments.foreach(_.delete())
+      segments.clear()
+      Utils.rm(dir)
+    }
   }
 
   /**
    * Truncate this log so that it ends with the greatest offset < targetOffset.
    * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
    */
-  def truncateTo(targetOffset: Long) {
+  private[log] def truncateTo(targetOffset: Long) {
     info("Truncating log %s to offset %d.".format(name, targetOffset))
     if(targetOffset < 0)
       throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
@@ -551,7 +548,6 @@ class Log(val dir: File,
         this.nextOffset.set(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
       }
-      truncates.getAndIncrement
     }
   }
     
@@ -559,7 +555,7 @@ class Log(val dir: File,
    *  Delete all data in the log and start at the new offset
    *  @param newOffset The new offset to start the log with
    */
-  def truncateFullyAndStartAt(newOffset: Long) {
+  private[log] def truncateFullyAndStartAt(newOffset: Long) {
     debug("Truncate and start log '" + name + "' to " + newOffset)
     lock synchronized {
       val segmentsToDelete = logSegments.toList
@@ -571,7 +567,6 @@ class Log(val dir: File,
                                 time = time))
       this.nextOffset.set(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
-      truncates.getAndIncrement
     }
   }
 
@@ -650,10 +645,8 @@ class Log(val dir: File,
    * @param newSegment The new log segment to add to the log
    * @param oldSegments The old log segments to delete from the log
    */
-  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], expectedTruncates: Int) {
+  private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment]) {
     lock synchronized {
-      if(expectedTruncates != numberOfTruncates)
-        throw new OptimisticLockFailureException("The log has been truncated, expected %d but found %d.".format(expectedTruncates, numberOfTruncates))
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
       newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
@@ -735,5 +728,13 @@ object Log {
   def indexFilename(dir: File, offset: Long) = 
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
   
+
+  /**
+   * Parse the topic and partition out of the directory name of a log
+   */
+  def parseTopicPartitionName(name: String): TopicAndPartition = {
+    val index = name.lastIndexOf('-')
+    TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
+  }
 }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index ccde2ab..6404647 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -20,15 +20,12 @@ package kafka.log
 import scala.collection._
 import scala.math
 import java.nio._
-import java.util.concurrent.Semaphore
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic._
 import java.util.Date
 import java.io.File
 import kafka.common._
 import kafka.message._
-import kafka.server.OffsetCheckpoint
 import kafka.utils._
+import java.lang.IllegalStateException
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
@@ -67,19 +64,9 @@ class LogCleaner(val config: CleanerConfig,
                  val logDirs: Array[File],
                  val logs: Pool[TopicAndPartition, Log], 
                  time: Time = SystemTime) extends Logging {
-    
-  /* the offset checkpoints holding the last cleaned point for each log */
-  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
-  
-  /* the set of logs currently being cleaned */
-  private val inProgress = mutable.HashSet[TopicAndPartition]()
-  
-  /* a global lock used to control all access to the in-progress set and the offset checkpoints */
-  private val lock = new Object
-    
-  /* a counter for creating unique thread names*/
-  private val threadId = new AtomicInteger(0)
-  
+  /* for managing the state of partitions being cleaned. */
+  private val cleanerManager = new LogCleanerManager(logDirs, logs);
+
   /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
                                         checkIntervalMs = 300, 
@@ -87,10 +74,7 @@ class LogCleaner(val config: CleanerConfig,
                                         time = time)
   
   /* the threads */
-  private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread())
-  
-  /* a hook for testing to synchronize on log cleaning completions */
-  private val cleaned = new Semaphore(0)
+  private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
   
   /**
    * Start the background cleaning
@@ -105,102 +89,79 @@ class LogCleaner(val config: CleanerConfig,
    */
   def shutdown() {
     info("Shutting down the log cleaner.")
-    cleaners.foreach(_.interrupt())
-    cleaners.foreach(_.join())
+    cleaners.foreach(_.shutdown())
   }
   
   /**
-   * For testing, a way to know when work has completed. This method blocks until the 
-   * cleaner has processed up to the given offset on the specified topic/partition
+   *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+   *  the partition is aborted.
    */
-  def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
-    while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
-      cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS)
+  def abortCleaning(topicAndPartition: TopicAndPartition) {
+    cleanerManager.abortCleaning(topicAndPartition)
   }
-  
+
   /**
-   * @return the position processed for all logs.
+   *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+   *  This call blocks until the cleaning of the partition is aborted and paused.
    */
-  def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = 
-    checkpoints.values.flatMap(_.read()).toMap
-  
-   /**
-    * Choose the log to clean next and add it to the in-progress set. We recompute this
-    * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
-    * the log manager maintains.
-    */
-  private def grabFilthiestLog(): Option[LogToClean] = {
-    lock synchronized {
-      val lastClean = allCleanerCheckpoints()      
-      val cleanableLogs = logs.filter(l => l._2.config.dedupe)                                     // skip any logs marked for delete rather than dedupe
-                              .filterNot(l => inProgress.contains(l._1))                           // skip any logs already in-progress
-                              .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0)))      // create a LogToClean instance for each
-      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)                                  // must have some bytes
-                                   .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
-      if(dirtyLogs.isEmpty) {
-        None
-      } else {
-        val filthiest = dirtyLogs.max
-        inProgress += filthiest.topicPartition
-        Some(filthiest)
-      }
-    }
+  def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+    cleanerManager.abortAndPauseCleaning(topicAndPartition)
   }
-  
+
   /**
-   * Save out the endOffset and remove the given log from the in-progress set.
+   *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
    */
-  private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
-    lock synchronized {
-      val checkpoint = checkpoints(dataDir)
-      val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
-      checkpoint.write(offsets)
-      inProgress -= topicAndPartition
-    }
-    cleaned.release()
+  def resumeCleaning(topicAndPartition: TopicAndPartition) {
+    cleanerManager.resumeCleaning(topicAndPartition)
   }
 
   /**
+   * TODO:
+   * For testing, a way to know when work has completed. This method blocks until the 
+   * cleaner has processed up to the given offset on the specified topic/partition
+   */
+  def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
+    while(!cleanerManager.allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
+      Thread.sleep(10)
+  }
+  
+  /**
    * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
    */
-  private class CleanerThread extends Thread {
+  private class CleanerThread(threadId: Int)
+    extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
     if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
       warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
-    val cleaner = new Cleaner(id = threadId.getAndIncrement(),
+
+    val cleaner = new Cleaner(id = threadId,
                               offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, 
                                                               hashAlgorithm = config.hashAlgorithm),
                               ioBufferSize = config.ioBufferSize / config.numThreads / 2,
                               maxIoBufferSize = config.maxMessageSize,
                               dupBufferLoadFactor = config.dedupeBufferLoadFactor,
                               throttler = throttler,
-                              time = time)
+                              time = time,
+                              checkDone = checkDone)
     
-    setName("kafka-log-cleaner-thread-" + cleaner.id)
-    setDaemon(false)
+    private def checkDone(topicAndPartition: TopicAndPartition) {
+      if (!isRunning.get())
+        throw new ThreadShutdownException
+      cleanerManager.checkCleaningAborted(topicAndPartition)
+    }
 
     /**
      * The main loop for the cleaner thread
      */
-    override def run() {
-      info("Starting cleaner thread %d...".format(cleaner.id))
-      try {
-        while(!isInterrupted) {
-          cleanOrSleep()
-        }
-      } catch {
-        case e: InterruptedException => // all done
-        case e: Exception =>
-          error("Error in cleaner thread %d:".format(cleaner.id), e)
-      }
-      info("Shutting down cleaner thread %d.".format(cleaner.id))
+    override def doWork() {
+      cleanOrSleep()
     }
     
     /**
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     private def cleanOrSleep() {
-      grabFilthiestLog() match {
+      cleanerManager.grabFilthiestLog() match {
         case None =>
           // there are no cleanable logs, sleep a while
           time.sleep(config.backOffMs)
@@ -211,10 +172,9 @@ class LogCleaner(val config: CleanerConfig,
             endOffset = cleaner.clean(cleanable)
             logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
           } catch {
-            case e: OptimisticLockFailureException => 
-              info("Cleaning of log was aborted due to colliding truncate operation.")
+            case pe: LogCleaningAbortedException => // task can be aborted, let it go.
           } finally {
-            doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+            cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }
       }
     }
@@ -260,7 +220,8 @@ private[log] class Cleaner(val id: Int,
                            maxIoBufferSize: Int,
                            dupBufferLoadFactor: Double,
                            throttler: Throttler,
-                           time: Time) extends Logging {
+                           time: Time,
+                           checkDone: (TopicAndPartition) => Unit) extends Logging {
 
   this.logIdent = "Cleaner " + id + ": "
   
@@ -284,8 +245,7 @@ private[log] class Cleaner(val id: Int,
     stats.clear()
     info("Beginning cleaning of log %s.".format(cleanable.log.name))
     val log = cleanable.log
-    val truncateCount = log.numberOfTruncates
-    
+
     // build the offset map
     info("Building offset map for %s...".format(cleanable.log.name))
     val upperBoundOffset = log.activeSegment.baseOffset
@@ -303,7 +263,7 @@ private[log] class Cleaner(val id: Int,
     // group the segments and clean the groups
     info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
-      cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs)
+      cleanSegments(log, group, offsetMap, deleteHorizonMs)
     
     stats.allDone()
     endOffset
@@ -318,10 +278,9 @@ private[log] class Cleaner(val id: Int,
    * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
    * @param deleteHorizonMs The time to retain delete tombstones
    */
-  private[log] def cleanSegments(log: Log, 
+  private[log] def cleanSegments(log: Log,
                                  segments: Seq[LogSegment], 
                                  map: OffsetMap, 
-                                 expectedTruncateCount: Int, 
                                  deleteHorizonMs: Long) {
     // create a new segment with the suffix .cleaned appended to both the log and index name
     val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
@@ -332,32 +291,32 @@ private[log] class Cleaner(val id: Int,
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
     val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
 
-    // clean segments into the new destination segment
-    for (old <- segments) {
-      val retainDeletes = old.lastModified > deleteHorizonMs
-      info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
-          .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-      cleanInto(old, cleaned, map, retainDeletes)
-    }
-      
-    // trim excess index
-    index.trimToValidSize()
-    
-    // flush new segment to disk before swap
-    cleaned.flush()
-    
-    // update the modification date to retain the last modified date of the original files
-    val modified = segments.last.lastModified
-    cleaned.lastModified = modified
-
-    // swap in new segment  
-    info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
     try {
-      log.replaceSegments(cleaned, segments, expectedTruncateCount)
+      // clean segments into the new destination segment
+      for (old <- segments) {
+        val retainDeletes = old.lastModified > deleteHorizonMs
+        info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
+            .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
+      }
+
+      // trim excess index
+      index.trimToValidSize()
+
+      // flush new segment to disk before swap
+      cleaned.flush()
+
+      // update the modification date to retain the last modified date of the original files
+      val modified = segments.last.lastModified
+      cleaned.lastModified = modified
+
+      // swap in new segment
+      info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+      log.replaceSegments(cleaned, segments)
     } catch {
-      case e: OptimisticLockFailureException =>
+      case e: LogCleaningAbortedException =>
         cleaned.delete()
-        throw e  
+        throw  e
     }
   }
 
@@ -372,10 +331,11 @@ private[log] class Cleaner(val id: Int,
    *
    * TODO: Implement proper compression support
    */
-  private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
+  private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
+                             dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
     var position = 0
     while (position < source.log.sizeInBytes) {
-      checkDone()
+      checkDone(topicAndPartition)
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
@@ -491,9 +451,9 @@ private[log] class Cleaner(val id: Int,
     require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
     val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
     for (segment <- dirty) {
-      checkDone()
+      checkDone(log.topicAndPartition)
       if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
-        offset = buildOffsetMap(segment, map)
+        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
     }
     info("Offset map for log %s complete.".format(log.name))
     offset
@@ -507,11 +467,11 @@ private[log] class Cleaner(val id: Int,
    *
    * @return The final offset covered by the map
    */
-  private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = {
+  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
     var position = 0
     var offset = segment.baseOffset
     while (position < segment.log.sizeInBytes) {
-      checkDone()
+      checkDone(topicAndPartition)
       readBuffer.clear()
       val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
       throttler.maybeThrottle(messages.sizeInBytes)
@@ -532,14 +492,6 @@ private[log] class Cleaner(val id: Int,
     restoreBuffers()
     offset
   }
-
-  /**
-   * If we aren't running any more throw an AllDoneException
-   */
-  private def checkDone() {
-    if (Thread.currentThread.isInterrupted)
-      throw new InterruptedException
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
new file mode 100644
index 0000000..1612c8d
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import kafka.utils.{Logging, Pool}
+import kafka.server.OffsetCheckpoint
+import collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.Utils._
+import java.util.concurrent.TimeUnit
+import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+
+private[log] sealed trait LogCleaningState
+private[log] case object LogCleaningInProgress extends LogCleaningState
+private[log] case object LogCleaningAborted extends LogCleaningState
+private[log] case object LogCleaningPaused extends LogCleaningState
+
+/**
+ *  Manage the state of each partition being cleaned.
+ *  If a partition is to be cleaned, it enters the LogCleaningInProgress state.
+ *  While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters
+ *  the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state.
+ *  While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is
+ *  requested to be resumed.
+ */
+private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging {
+  /* the offset checkpoints holding the last cleaned point for each log */
+  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+
+  /* the set of logs currently being cleaned */
+  private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
+
+  /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+  private val lock = new ReentrantLock
+  /* for coordinating the pausing and the cleaning of a partition */
+  private val pausedCleaningCond = lock.newCondition()
+
+  /**
+   * @return the position processed for all logs.
+   */
+  def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+    checkpoints.values.flatMap(_.read()).toMap
+
+   /**
+    * Choose the log to clean next and add it to the in-progress set. We recompute this
+    * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+    * the log manager maintains.
+    */
+  def grabFilthiestLog(): Option[LogToClean] = {
+    inLock(lock) {
+      val lastClean = allCleanerCheckpoints()
+      val cleanableLogs = logs.filter(l => l._2.config.dedupe)                                     // skip any logs marked for delete rather than dedupe
+                              .filterNot(l => inProgress.contains(l._1))                           // skip any logs already in-progress
+                              .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0)))      // create a LogToClean instance for each
+      val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0)                                  // must have some bytes
+                                   .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+      if(dirtyLogs.isEmpty) {
+        None
+      } else {
+        val filthiest = dirtyLogs.max
+        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
+        Some(filthiest)
+      }
+    }
+  }
+
+  /**
+   *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
+   *  the partition is aborted.
+   *  This is implemented by first abortAndPausing and then resuming the cleaning of the partition.
+   */
+  def abortCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      abortAndPauseCleaning(topicAndPartition)
+      resumeCleaning(topicAndPartition)
+      info("The cleaning for partition %s is aborted".format(topicAndPartition))
+    }
+  }
+
+  /**
+   *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
+   *  This call blocks until the cleaning of the partition is aborted and paused.
+   *  1. If the partition is not in progress, mark it as paused.
+   *  2. Otherwise, first mark the state of the partition as aborted.
+   *  3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it
+   *     throws a LogCleaningAbortedException to stop the cleaning task.
+   *  4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused.
+   *  5. abortAndPauseCleaning() waits until the state of the partition is changed to paused.
+   */
+  def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      inProgress.get(topicAndPartition) match {
+        case None =>
+          inProgress.put(topicAndPartition, LogCleaningPaused)
+        case Some(state) =>
+          state match {
+            case LogCleaningInProgress =>
+              inProgress.put(topicAndPartition, LogCleaningAborted)
+            case s =>
+              throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s))
+          }
+      }
+      while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
+        pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
+      info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
+    }
+  }
+
+  /**
+   *  Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed.
+   */
+  def resumeCleaning(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      inProgress.get(topicAndPartition) match {
+        case None =>
+          throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition))
+        case Some(state) =>
+          state match {
+            case LogCleaningPaused =>
+              inProgress.remove(topicAndPartition)
+            case s =>
+              throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s))
+          }
+      }
+    }
+    info("The cleaning for partition %s is resumed".format(topicAndPartition))
+  }
+
+  /**
+   *  Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call.
+   */
+  def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
+    inProgress.get(topicAndPartition) match {
+      case None => return false
+      case Some(state) =>
+        if (state == expectedState)
+          return true
+        else
+          return false
+    }
+  }
+
+  /**
+   *  Check if the cleaning for a partition is aborted. If so, throw an exception.
+   */
+  def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
+    inLock(lock) {
+      if (isCleaningInState(topicAndPartition, LogCleaningAborted))
+        throw new LogCleaningAbortedException()
+    }
+  }
+
+  /**
+   * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
+   */
+  def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+    inLock(lock) {
+      inProgress(topicAndPartition) match {
+        case LogCleaningInProgress =>
+          val checkpoint = checkpoints(dataDir)
+          val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+          checkpoint.write(offsets)
+          inProgress.remove(topicAndPartition)
+        case LogCleaningAborted =>
+          inProgress.put(topicAndPartition, LogCleaningPaused)
+          pausedCleaningCond.signalAll()
+        case s =>
+          throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 81be88a..10062af 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,10 +22,8 @@ import java.util.concurrent.TimeUnit
 import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.server.KafkaConfig
 import kafka.server.OffsetCheckpoint
 
-
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
  * All read and write operations are delegated to the individual log instances.
@@ -50,9 +48,9 @@ class LogManager(val logDirs: Array[File],
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000
-  private val logCreationLock = new Object
+  private val logCreationOrDeletionLock = new Object
   private val logs = new Pool[TopicAndPartition, Log]()
-  
+
   createAndValidateLogDirs(logDirs)
   private var dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
@@ -115,7 +113,7 @@ class LogManager(val logDirs: Array[File],
         for(dir <- subDirs) {
           if(dir.isDirectory) {
             info("Loading log '" + dir.getName + "'")
-            val topicPartition = parseTopicPartitionName(dir.getName)
+            val topicPartition = Log.parseTopicPartitionName(dir.getName)
             val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
             val log = new Log(dir, 
                               config,
@@ -194,12 +192,36 @@ class LogManager(val logDirs: Array[File],
       val log = logs.get(topicAndPartition)
       // If the log does not exist, skip it
       if (log != null) {
+        //May need to abort and pause the cleaning of the log, and resume after truncation is done.
+        val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
+        if (needToStopCleaner && cleaner != null)
+          cleaner.abortAndPauseCleaning(topicAndPartition)
         log.truncateTo(truncateOffset)
+        if (needToStopCleaner && cleaner != null)
+          cleaner.resumeCleaning(topicAndPartition)
       }
     }
     checkpointRecoveryPointOffsets()
   }
-  
+
+  /**
+   *  Delete all data in a partition and start the log at the new offset
+   *  @param newOffset The new offset to start the log with
+   */
+  def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
+    val log = logs.get(topicAndPartition)
+    // If the log does not exist, skip it
+    if (log != null) {
+        //Abort and pause the cleaning of the log, and resume after truncation is done.
+      if (cleaner != null)
+        cleaner.abortAndPauseCleaning(topicAndPartition)
+      log.truncateFullyAndStartAt(newOffset)
+      if (cleaner != null)
+        cleaner.resumeCleaning(topicAndPartition)
+    }
+    checkpointRecoveryPointOffsets()
+  }
+
   /**
    * Write out the current recovery point for all logs to a text file in the log directory 
    * to avoid recovering the whole log on startup.
@@ -229,7 +251,7 @@ class LogManager(val logDirs: Array[File],
    * If the log already exists, just return a copy of the existing log
    */
   def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
-    logCreationLock synchronized {
+    logCreationOrDeletionLock synchronized {
       var log = logs.get(topicAndPartition)
       
       // check if the log has already been created in another thread
@@ -254,7 +276,27 @@ class LogManager(val logDirs: Array[File],
       log
     }
   }
-  
+
+  /**
+   *  Delete a log.
+   */
+  def deleteLog(topicAndPartition: TopicAndPartition) {
+    var removedLog: Log = null
+    logCreationOrDeletionLock synchronized {
+      removedLog = logs.remove(topicAndPartition)
+    }
+    if (removedLog != null) {
+      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+      if (cleaner != null)
+        cleaner.abortCleaning(topicAndPartition)
+      removedLog.delete()
+      info("Deleted log for partition [%s,%d] in %s."
+           .format(topicAndPartition.topic,
+                   topicAndPartition.partition,
+                   removedLog.dir.getAbsolutePath))
+    }
+  }
+
   /**
    * Choose the next directory in which to create a log. Currently this is done
    * by calculating the number of partitions in each directory and then choosing the
@@ -280,7 +322,6 @@ class LogManager(val logDirs: Array[File],
    */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
-    val topic = parseTopicPartitionName(log.name).topic
     log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
   }
 
@@ -289,7 +330,6 @@ class LogManager(val logDirs: Array[File],
    *  is at least logRetentionSize bytes in size
    */
   private def cleanupSegmentsToMaintainSize(log: Log): Int = {
-    val topic = parseTopicPartitionName(log.dir.getName).topic
     if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
       return 0
     var diff = log.size - log.config.retentionSize
@@ -334,6 +374,7 @@ class LogManager(val logDirs: Array[File],
    */
   private def flushDirtyLogs() = {
     debug("Checking for dirty logs to flush...")
+
     for ((topicAndPartition, log) <- logs) {
       try {
         val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
@@ -344,22 +385,7 @@ class LogManager(val logDirs: Array[File],
       } catch {
         case e: Throwable =>
           error("Error flushing topic " + topicAndPartition.topic, e)
-          e match {
-            case _: IOException =>
-              fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
-              System.exit(1)
-            case _ =>
-          }
       }
     }
   }
-
-  /**
-   * Parse the topic and partition out of the directory name of a log
-   */
-  private def parseTopicPartitionName(name: String): TopicAndPartition = {
-    val index = name.lastIndexOf('-')
-    TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 715845b..73e605e 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -68,7 +68,6 @@ class ReplicaFetcherThread(name:String,
    */
   def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
     val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
-    val log = replica.log.get
 
     /**
      * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -81,8 +80,8 @@ class ReplicaFetcherThread(name:String,
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
-    if (leaderEndOffset < log.logEndOffset) {
-      log.truncateTo(leaderEndOffset)
+    if (leaderEndOffset < replica.logEndOffset) {
+      replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
       warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d"
         .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset))
       leaderEndOffset
@@ -94,7 +93,7 @@ class ReplicaFetcherThread(name:String,
        * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
        */
       val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
-      log.truncateFullyAndStartAt(leaderStartOffset)
+      replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
       warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d"
         .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
       leaderStartOffset

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 242c18d..f9d10d3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common._
 import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
-import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
+import kafka.controller.KafkaController
 import org.apache.log4j.Logger
 
 
@@ -116,16 +116,16 @@ class ReplicaManager(val config: KafkaConfig,
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
     stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
     val errorCode = ErrorMapping.NoError
-    getReplica(topic, partitionId) match {
-      case Some(replica) =>
-        /* TODO: handle deleteLog in a better way */
-        //if (deletePartition)
-        //  logManager.deleteLog(topic, partition)
+    getPartition(topic, partitionId) match {
+      case Some(partition) =>
         leaderPartitionsLock synchronized {
-          leaderPartitions -= replica.partition
+          leaderPartitions -= partition
+        }
+        if(deletePartition) {
+          val removedPartition = allPartitions.remove((topic, partitionId))
+          if (removedPartition != null)
+            removedPartition.delete() // this will delete the local log
         }
-        if(deletePartition)
-          allPartitions.remove((topic, partitionId))
       case None => //do nothing if replica no longer exists
     }
     stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 52d35a3..59de1b4 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -23,9 +23,10 @@ import java.util.Properties
 import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.{KafkaServer, KafkaConfig}
 import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
+import kafka.common.{TopicExistsException, TopicAndPartition}
+import kafka.server.{KafkaServer, KafkaConfig}
+import java.io.File
 
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@@ -132,6 +133,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
   }
 
+  private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = {
+    servers.filter(server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists)
+           .map(_.config.brokerId)
+           .toSet
+  }
+
   @Test
   def testPartitionReassignmentWithLeaderInNewReplicas() {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
@@ -157,6 +164,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
 
@@ -184,6 +192,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
 
@@ -211,6 +220,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
 
@@ -251,6 +261,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
     ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f63e3f73/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 5a312bf..51cd94b 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import org.junit.{After, Test}
 import java.nio._
 import java.io.File
 import scala.collection._
@@ -62,7 +62,7 @@ class CleanerTest extends JUnitSuite {
     keys.foreach(k => map.put(key(k), Long.MaxValue))
     
     // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
+    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
     val shouldRemain = keysInLog(log).filter(!keys.contains(_))
     assertEquals(shouldRemain, keysInLog(log))
   }
@@ -94,29 +94,31 @@ class CleanerTest extends JUnitSuite {
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] = 
     log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
-  
-  
+
+  def abortCheckDone(topicAndPartition: TopicAndPartition) {
+    throw new LogCleaningAbortedException()
+  }
+
   /**
-   * Test that a truncation during cleaning throws an OptimisticLockFailureException
+   * Test that abortion during cleaning throws a LogCleaningAbortedException
    */
   @Test
-  def testCleanSegmentsWithTruncation() {
-    val cleaner = makeCleaner(Int.MaxValue)
+  def testCleanSegmentsWithAbort() {
+    val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
     val log = makeLog(config = logConfig.copy(segmentSize = 1024))
-    
+
     // append messages to the log until we have four segments
-    while(log.numberOfSegments < 2)
+    while(log.numberOfSegments < 4)
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      
-    log.truncateTo(log.logEndOffset-2)
+
     val keys = keysInLog(log)
     val map = new FakeOffsetMap(Int.MaxValue)
     keys.foreach(k => map.put(key(k), Long.MaxValue))
-    intercept[OptimisticLockFailureException] {
-      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
+    intercept[LogCleaningAbortedException] {
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
     }
   }
-  
+
   /**
    * Validate the logic for grouping log segments together for cleaning
    */
@@ -196,15 +198,18 @@ class CleanerTest extends JUnitSuite {
   
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =
     new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
-  
-  def makeCleaner(capacity: Int) = 
+
+  def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
+
+  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone) =
     new Cleaner(id = 0, 
                 offsetMap = new FakeOffsetMap(capacity), 
                 ioBufferSize = 64*1024, 
                 maxIoBufferSize = 64*1024,
                 dupBufferLoadFactor = 0.75,                
                 throttler = throttler, 
-                time = time)
+                time = time,
+                checkDone = checkDone )
   
   def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
     for((key, value) <- seq)