You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/17 15:35:24 UTC

[3/4] kafka git commit: KAFKA-5163; Support replicas movement between log directories (KIP-113)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 6da494e..0c92c71 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -70,7 +70,11 @@ class LogManager(logDirs: Seq[File],
   val InitialTaskDelayMs = 30 * 1000
 
   private val logCreationOrDeletionLock = new Object
-  private val logs = new Pool[TopicPartition, Log]()
+  private val currentLogs = new Pool[TopicPartition, Log]()
+  // Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
+  // from one log directory to another log directory on the same broker. The directory of the future log will be renamed
+  // to replace the current log of the partition after the future log catches up with the current log
+  private val futureLogs = new Pool[TopicPartition, Log]()
   private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@@ -102,7 +106,7 @@ class LogManager(logDirs: Seq[File],
   // public, so we can access this from kafka.admin.DeleteTopicTest
   val cleaner: LogCleaner =
     if(cleanerConfig.enableCleaner)
-      new LogCleaner(cleanerConfig, liveLogDirs, logs, logDirFailureChannel, time = time)
+      new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
     else
       null
 
@@ -179,18 +183,30 @@ class LogManager(logDirs: Seq[File],
       if (cleaner != null)
         cleaner.handleLogDirFailure(dir)
 
-      val offlineTopicPartitions = logs.collect {
+      val offlineCurrentTopicPartitions = currentLogs.collect {
         case (tp, log) if log.dir.getParent == dir => tp
       }
-
-      offlineTopicPartitions.foreach { topicPartition =>
-        val removedLog = logs.remove(topicPartition)
+      offlineCurrentTopicPartitions.foreach { topicPartition => {
+        val removedLog = currentLogs.remove(topicPartition)
         if (removedLog != null) {
           removedLog.closeHandlers()
           removedLog.removeLogMetrics()
         }
+      }}
+
+      val offlineFutureTopicPartitions = futureLogs.collect {
+        case (tp, log) if log.dir.getParent == dir => tp
       }
-      info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
+      offlineFutureTopicPartitions.foreach { topicPartition => {
+        val removedLog = futureLogs.remove(topicPartition)
+        if (removedLog != null) {
+          removedLog.closeHandlers()
+          removedLog.removeLogMetrics()
+        }
+      }}
+
+      info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
+           s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
       dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
     }
   }
@@ -221,7 +237,7 @@ class LogManager(logDirs: Seq[File],
     val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
     val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-    val current = Log(
+    val log = Log(
       dir = logDir,
       config = config,
       logStartOffset = logStartOffset,
@@ -234,13 +250,22 @@ class LogManager(logDirs: Seq[File],
       logDirFailureChannel = logDirFailureChannel)
 
     if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
-      this.logsToBeDeleted.add(current)
+      this.logsToBeDeleted.add(log)
     } else {
-      val previous = this.logs.put(topicPartition, current)
+      val previous = {
+        if (log.isFuture)
+          this.futureLogs.put(topicPartition, log)
+        else
+          this.currentLogs.put(topicPartition, log)
+      }
       if (previous != null) {
-        throw new IllegalArgumentException(
-          "Duplicate log directories found: %s, %s!".format(
-            current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+        if (log.isFuture)
+          throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+        else
+          throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
+            s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
+            s"replacing current replica with future replica. Recover broker from this failure by manually deleting one of the two directories " +
+            s"for this partition. It is recommended to delete the partition in the log directory that is known to have failed recently.")
       }
     }
   }
@@ -252,7 +277,7 @@ class LogManager(logDirs: Seq[File],
     info("Loading logs.")
     val startMs = time.milliseconds
     val threadPools = ArrayBuffer.empty[ExecutorService]
-    val offlineDirs = ArrayBuffer.empty[(String, IOException)]
+    val offlineDirs = mutable.Set.empty[(String, IOException)]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
     for (dir <- liveLogDirs) {
@@ -295,7 +320,7 @@ class LogManager(logDirs: Seq[File],
               loadLog(logDir, recoveryPoints, logStartOffsets)
             } catch {
               case e: IOException =>
-                offlineDirs.append((dir.getAbsolutePath, e))
+                offlineDirs.add((dir.getAbsolutePath, e))
                 error("Error while loading log dir " + dir.getAbsolutePath, e)
             }
           }
@@ -303,7 +328,7 @@ class LogManager(logDirs: Seq[File],
         jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
       } catch {
         case e: IOException =>
-          offlineDirs.append((dir.getAbsolutePath, e))
+          offlineDirs.add((dir.getAbsolutePath, e))
           error("Error while loading log dir " + dir.getAbsolutePath, e)
       }
     }
@@ -315,10 +340,11 @@ class LogManager(logDirs: Seq[File],
           cleanShutdownFile.delete()
         } catch {
           case e: IOException =>
-            offlineDirs.append((cleanShutdownFile.getParent, e))
+            offlineDirs.add((cleanShutdownFile.getParent, e))
             error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
         }
       }
+
       offlineDirs.foreach { case (dir, e) =>
         logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
       }
@@ -442,24 +468,30 @@ class LogManager(logDirs: Seq[File],
    * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
    *
    * @param partitionOffsets Partition logs that need to be truncated
+   * @param isFuture True iff the truncation should be performed on the future log of the specified partitions
    */
-  def truncateTo(partitionOffsets: Map[TopicPartition, Long]) {
+  def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) {
     var truncated = false
     for ((topicPartition, truncateOffset) <- partitionOffsets) {
-      val log = logs.get(topicPartition)
+      val log = {
+        if (isFuture)
+          futureLogs.get(topicPartition)
+        else
+          currentLogs.get(topicPartition)
+      }
       // If the log does not exist, skip it
       if (log != null) {
         //May need to abort and pause the cleaning of the log, and resume after truncation is done.
         val needToStopCleaner = cleaner != null && truncateOffset < log.activeSegment.baseOffset
-        if (needToStopCleaner)
+        if (needToStopCleaner && !isFuture)
           cleaner.abortAndPauseCleaning(topicPartition)
         try {
           if (log.truncateTo(truncateOffset))
             truncated = true
-          if (needToStopCleaner)
+          if (needToStopCleaner && !isFuture)
             cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
         } finally {
-          if (needToStopCleaner)
+          if (needToStopCleaner && !isFuture)
             cleaner.resumeCleaning(topicPartition)
         }
       }
@@ -470,24 +502,31 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   *  Delete all data in a partition and start the log at the new offset
+   * Delete all data in a partition and start the log at the new offset
    *
-   *  @param newOffset The new offset to start the log with
+   * @param topicPartition The partition whose log needs to be truncated
+   * @param newOffset The new offset to start the log with
+   * @param isFuture True iff the truncation should be performed on the future log of the specified partition
    */
-  def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) {
-    val log = logs.get(topicPartition)
+  def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long, isFuture: Boolean) {
+    val log = {
+      if (isFuture)
+        futureLogs.get(topicPartition)
+      else
+        currentLogs.get(topicPartition)
+    }
     // If the log does not exist, skip it
     if (log != null) {
         //Abort and pause the cleaning of the log, and resume after truncation is done.
-      if (cleaner != null)
+      if (cleaner != null && !isFuture)
         cleaner.abortAndPauseCleaning(topicPartition)
       log.truncateFullyAndStartAt(newOffset)
-      if (cleaner != null) {
+      if (cleaner != null && !isFuture) {
         cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
         cleaner.resumeCleaning(topicPartition)
       }
+      checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
     }
-    checkpointLogRecoveryOffsets()
   }
 
   /**
@@ -516,7 +555,7 @@ class LogManager(logDirs: Seq[File],
     } {
       try {
         checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
-        logs.values.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
+        allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
       } catch {
         case e: IOException =>
           logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
@@ -545,33 +584,61 @@ class LogManager(logDirs: Seq[File],
     }
   }
 
-  def updatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
-    // The logDir should be an absolute path
-    preferredLogDirs.put(topicPartition, logDir)
+  // The logDir should be an absolute path
+  def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
+    // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
+    if (!getLog(topicPartition).exists(_.dir.getParent == logDir) &&
+        !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir))
+      preferredLogDirs.put(topicPartition, logDir)
+  }
+
+  def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
+    if (cleaner != null)
+      cleaner.abortAndPauseCleaning(topicPartition)
   }
 
+
   /**
    * Get the log if it exists, otherwise return None
+   *
+   * @param topicPartition the partition of the log
+   * @param isFuture True iff the future log of the specified partition should be returned
    */
-  def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
+  def getLog(topicPartition: TopicPartition, isFuture: Boolean = false): Option[Log] = {
+    if (isFuture)
+      Option(futureLogs.get(topicPartition))
+    else
+      Option(currentLogs.get(topicPartition))
+  }
 
   /**
    * If the log already exists, just return a copy of the existing log
    * Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition
    * Otherwise throw KafkaStorageException
    *
+   * @param topicPartition The partition whose log needs to be returned or created
+   * @param config The configuration of the log that should be applied for log creation
    * @param isNew Whether the replica should have existed on the broker or not
+   * @param isFuture True iff the future log of the specified partition should be returned or created
    * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    */
-  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
     logCreationOrDeletionLock synchronized {
-      getLog(topicPartition).getOrElse {
+      getLog(topicPartition, isFuture).getOrElse {
         // create the log if it has not already been created in another thread
         if (!isNew && offlineLogDirs.nonEmpty)
           throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
 
         val logDir = {
           val preferredLogDir = preferredLogDirs.get(topicPartition)
+
+          if (isFuture) {
+            if (preferredLogDir == null)
+              throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
+            else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
+              throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
+          }
+
           if (preferredLogDir != null)
             preferredLogDir
           else
@@ -581,7 +648,12 @@ class LogManager(logDirs: Seq[File],
           throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline")
 
         try {
-          val dir = new File(logDir, topicPartition.topic + "-" + topicPartition.partition)
+          val dir = {
+            if (isFuture)
+              new File(logDir, Log.logFutureDirName(topicPartition))
+            else
+              new File(logDir, Log.logDirName(topicPartition))
+          }
           Files.createDirectories(dir.toPath)
 
           val log = Log(
@@ -596,7 +668,10 @@ class LogManager(logDirs: Seq[File],
             brokerTopicStats = brokerTopicStats,
             logDirFailureChannel = logDirFailureChannel)
 
-          logs.put(topicPartition, log)
+          if (isFuture)
+            futureLogs.put(topicPartition, log)
+          else
+            currentLogs.put(topicPartition, log)
 
           info("Created log for partition [%s,%d] in %s with properties {%s}."
             .format(topicPartition.topic,
@@ -641,44 +716,78 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
+    * Mark the partition directory in the source log directory for deletion and
+    * rename the future log of this partition in the destination log directory to be the current log
+    *
+    * @param topicPartition TopicPartition that needs to be swapped
+    */
+  def replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit = {
+    logCreationOrDeletionLock synchronized {
+      val sourceLog = currentLogs.get(topicPartition)
+      val destLog = futureLogs.get(topicPartition)
+
+      if (sourceLog == null)
+        throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
+      if (destLog == null)
+        throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
+
+      destLog.renameDir(Log.logDirName(topicPartition))
+      // Now that future replica has been successfully renamed to be the current replica
+      // Update the cached map and log cleaner as appropriate.
+      futureLogs.remove(topicPartition)
+      currentLogs.put(topicPartition, destLog)
+      if (cleaner != null) {
+        cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile)
+        cleaner.resumeCleaning(topicPartition)
+      }
+
+      try {
+        sourceLog.renameDir(Log.logDeleteDirName(topicPartition))
+        // Now that replica in source log directory has been successfully renamed for deletion.
+        // Close the log, update checkpoint files, and enqueue this log to be deleted.
+        sourceLog.close()
+        checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
+        checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
+        logsToBeDeleted.add(sourceLog)
+      } catch {
+        case e: KafkaStorageException =>
+          // If sourceLog's log directory is offline, we need close its handlers here.
+          // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
+          sourceLog.closeHandlers()
+          sourceLog.removeLogMetrics()
+          throw e
+      }
+
+      info(s"The current replica is successfully replaced with the future replica for $topicPartition")
+    }
+  }
+
+  /**
     * Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
     * add it in the queue for deletion.
     *
     * @param topicPartition TopicPartition that needs to be deleted
+    * @param isFuture True iff the future log of the specified partition should be deleted
     * @return the removed log
     */
-  def asyncDelete(topicPartition: TopicPartition): Log = {
+  def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
     val removedLog: Log = logCreationOrDeletionLock synchronized {
-      logs.remove(topicPartition)
+      if (isFuture)
+        futureLogs.remove(topicPartition)
+      else
+        currentLogs.remove(topicPartition)
     }
     if (removedLog != null) {
-      try {
-        //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
-        if (cleaner != null) {
-          cleaner.abortCleaning(topicPartition)
-          cleaner.updateCheckpoints(removedLog.dir.getParentFile)
-        }
-        val dirName = Log.logDeleteDirName(removedLog.name)
-        removedLog.close()
-        val renamedDir = new File(removedLog.dir.getParent, dirName)
-        val renameSuccessful = removedLog.dir.renameTo(renamedDir)
-        if (renameSuccessful) {
-          checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
-          removedLog.dir = renamedDir
-          // change the file pointers for log and index file
-          removedLog.logSegments.foreach(_.updateDir(renamedDir))
-          logsToBeDeleted.add(removedLog)
-          removedLog.removeLogMetrics()
-          info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
-        } else {
-          throw new IOException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
-        }
-      } catch {
-        case e: IOException =>
-          val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}."
-          logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e)
-          throw new KafkaStorageException(msg, e)
+      //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+      if (cleaner != null && !isFuture) {
+        cleaner.abortCleaning(topicPartition)
+        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
+      removedLog.renameDir(Log.logDeleteDirName(topicPartition))
+      checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
+      checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+      logsToBeDeleted.add(removedLog)
+      info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
     } else if (offlineLogDirs.nonEmpty) {
       throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))
     }
@@ -724,18 +833,21 @@ class LogManager(logDirs: Seq[File],
   /**
    * Get all the partition logs
    */
-  def allLogs: Iterable[Log] = logs.values
+  def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
 
-  /**
-   * Get a map of TopicPartition => Log
-   */
-  def logsByTopicPartition: Map[TopicPartition, Log] = logs.toMap
+  def logsByTopic(topic: String): Seq[Log] = {
+    (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) =>
+      topicPartition.topic() == topic
+    }.map { case (topicPartition, log) => log }
+  }
 
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
   private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-    this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent }
+    (this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
+      case (_, log) => log.dir.getParent
+    }.mapValues(_.toMap)
   }
 
   // logDir should be an absolute path
@@ -753,7 +865,7 @@ class LogManager(logDirs: Seq[File],
   private def flushDirtyLogs(): Unit = {
     debug("Checking for dirty logs to flush...")
 
-    for ((topicPartition, log) <- logs) {
+    for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
       try {
         val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
         debug("Checking if flush is needed on " + topicPartition.topic + " flush interval  " + log.config.flushMs +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 63c1f56..da84b19 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -457,7 +457,7 @@ object ProducerStateManager {
  */
 @nonthreadsafe
 class ProducerStateManager(val topicPartition: TopicPartition,
-                           val logDir: File,
+                           @volatile var logDir: File,
                            val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging {
   import ProducerStateManager._
   import java.util

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index a8316b4..c385d4f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -69,13 +69,24 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
     Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
   }
 
+  // This method is only needed by ReplicaAlterDirManager
+  def markPartitionsForTruncation(brokerId: Int, topicPartition: TopicPartition, truncationOffset: Long) {
+    mapLock synchronized {
+      val fetcherId = getFetcherId(topicPartition.topic, topicPartition.partition)
+      val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerId, fetcherId)
+      fetcherThreadMap.get(brokerIdAndFetcherId).foreach { thread =>
+        thread.markPartitionsForTruncation(topicPartition, truncationOffset)
+      }
+    }
+  }
+
   // to be defined in subclass to create a specific fetcher
   def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
 
   def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
     mapLock synchronized {
-      val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
-        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
+      val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialFetchOffset) =>
+        BrokerAndFetcherId(brokerAndInitialFetchOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
 
       def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
         val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
@@ -83,7 +94,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
         fetcherThread.start
       }
 
-      for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
+      for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
         val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
         fetcherThreadMap.get(brokerIdAndFetcherId) match {
           case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port =>
@@ -95,7 +106,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
             addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
         }
 
-        fetcherThreadMap(brokerIdAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
+        fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
           tp -> brokerAndInitOffset.initOffset
         })
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index ddf6855..b078073 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,11 +21,10 @@ import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.BrokerEndPoint
 import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
 import kafka.common.{ClientIdAndBroker, KafkaException}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.protocol.Errors
 import AbstractFetcherThread._
 
@@ -84,7 +83,7 @@ abstract class AbstractFetcherThread(name: String,
 
   protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
 
-  override def shutdown(){
+  override def shutdown() {
     initiateShutdown()
     inLock(partitionMapLock) {
       partitionMapCond.signalAll()
@@ -132,9 +131,9 @@ abstract class AbstractFetcherThread(name: String,
       inLock(partitionMapLock) {
         //Check no leadership changes happened whilst we were unlocked, fetching epochs
         val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
-        val ResultWithPartitions(truncationPoints, partitionsWithError) = maybeTruncate(leaderEpochs)
+        val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs)
         handlePartitionsWithErrors(partitionsWithError)
-        markTruncationComplete(truncationPoints)
+        markTruncationCompleteAndUpdateFetchOffset(fetchOffsets)
       }
     }
   }
@@ -170,8 +169,10 @@ abstract class AbstractFetcherThread(name: String,
           val topic = topicPartition.topic
           val partitionId = topicPartition.partition
           Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
-            // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
-            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
+            // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
+            // In this case, we only want to process the fetch response if the partition state is ready for fetch and the current offset is the same as the offset requested.
+            if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset &&
+                currentPartitionFetchState.isReadyForFetch) {
               partitionData.error match {
                 case Errors.NONE =>
                   try {
@@ -184,7 +185,8 @@ abstract class AbstractFetcherThread(name: String,
                     processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
 
                     val validBytes = records.validBytes
-                    if (validBytes > 0) {
+                    // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
+                    if (validBytes > 0 && partitionStates.contains(topicPartition)) {
                       // Update partitionStates only if there is no exception during processPartitionData
                       partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
                       fetcherStats.byteRate.mark(validBytes)
@@ -195,10 +197,10 @@ abstract class AbstractFetcherThread(name: String,
                       // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                       // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                       // should get fixed in the subsequent fetches
-                      logger.error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} error ${ime.getMessage}")
+                      error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset}", ime)
                       partitionsWithError += topicPartition
                     case e: KafkaStorageException =>
-                      logger.error(s"Error while processing data for partition $topicPartition", e)
+                      error(s"Error while processing data for partition $topicPartition", e)
                       partitionsWithError += topicPartition
                     case e: Throwable =>
                       throw new KafkaException(s"Error processing data for partition $topicPartition " +
@@ -212,12 +214,12 @@ abstract class AbstractFetcherThread(name: String,
                   } catch {
                     case e: FatalExitError => throw e
                     case e: Throwable =>
-                      error(s"Error getting offset for partition $topicPartition to broker ${sourceBroker.id}", e)
+                      error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e)
                       partitionsWithError += topicPartition
                   }
                 case _ =>
                   if (isRunning.get) {
-                    error(s"Error for partition $topicPartition to broker ${sourceBroker.id}:${partitionData.exception.get}")
+                    error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
                     partitionsWithError += topicPartition
                   }
               }
@@ -231,18 +233,31 @@ abstract class AbstractFetcherThread(name: String,
     handlePartitionsWithErrors(partitionsWithError)
   }
 
-  def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) {
+  def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {
+    if (!includeLogTruncation)
+      throw new IllegalStateException("Truncation should not be requested if includeLogTruncation is disabled")
+    partitionMapLock.lockInterruptibly()
+    try {
+      Option(partitionStates.stateValue(topicPartition)).foreach { state =>
+        val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset), state.delay, truncatingLog = true)
+        partitionStates.updateAndMoveToEnd(topicPartition, newState)
+      }
+      partitionMapCond.signalAll()
+    } finally partitionMapLock.unlock()
+  }
+
+  def addPartitions(initialFetchOffsets: Map[TopicPartition, Long]) {
     partitionMapLock.lockInterruptibly()
     try {
       // If the partitionMap already has the topic/partition, then do not update the map with the old offset
-      val newPartitionToState = partitionAndOffsets.filter { case (tp, _) =>
+      val newPartitionToState = initialFetchOffsets.filter { case (tp, _) =>
         !partitionStates.contains(tp)
-      }.map { case (tp, offset) =>
+      }.map { case (tp, initialFetchOffset) =>
         val fetchState =
-          if (offset < 0)
+          if (initialFetchOffset < 0)
             new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
           else
-            new PartitionFetchState(offset, includeLogTruncation)
+            new PartitionFetchState(initialFetchOffset, includeLogTruncation)
         tp -> fetchState
       }
       val existingPartitionToState = states().toMap
@@ -252,14 +267,14 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-    * Loop through all partitions, marking them as truncation complete and applying the correct offset
+    * Loop through all partitions, marking them as truncation complete and update the fetch offset
     *
-    * @param partitions the partitions to mark truncation complete
+    * @param fetchOffsets the partitions to mark truncation complete
     */
-  private def markTruncationComplete(partitions: Map[TopicPartition, Long]) {
+  private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, Long]) {
     val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
       .map { state =>
-        val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
+        val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match {
           case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false)
           case None => state.value()
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ddeecb0..0edd638 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -52,7 +52,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
     // Validate the configurations.
     val configNamesToExclude = excludedConfigs(topic, topicConfig)
 
-    val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
+    val logs = logManager.logsByTopic(topic).toBuffer
     if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the new LogConfig */
       val props = new Properties()
@@ -61,10 +61,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
         if (!configNamesToExclude.contains(key)) props.put(key, value)
       }
       val logConfig = LogConfig(props)
-      if ((topicConfig.containsKey(LogConfig.RetentionMsProp) 
+      if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
         || topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
         && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
-        warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " + 
+        warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
           s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " +
           s"This may result in frequent log rolling.")
       logs.foreach(_.config = logConfig)
@@ -97,7 +97,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
         .map(_ (0).toInt).toSeq //convert to list of partition ids
     }
   }
-  
+
   def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
     // Verify message format version
     Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString =>
@@ -189,6 +189,7 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quo
     if (brokerConfig.brokerId == brokerId.trim.toInt) {
       quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
       quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
+      quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 175bf62..ddfdff8 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -36,6 +36,7 @@ object DynamicConfig {
     //Properties
     val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate"
     val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate"
+    val ReplicaAlterLogDirsIoMaxBytesPerSecondProp = "replica.alter.log.dirs.io.max.bytes.per.second"
 
     //Defaults
     val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -47,12 +48,15 @@ object DynamicConfig {
     val FollowerReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for followers enumerated in the " +
       s"property ${LogConfig.FollowerReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " +
       s"limit be kept above 1MB/s for accurate behaviour."
+    val ReplicaAlterLogDirsIoMaxBytesPerSecondDoc = "A long representing the upper bound (bytes/sec) on disk IO used for moving replica between log directories on the same broker. " +
+      s"This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour."
 
     //Definitions
     private val brokerConfigDef = new ConfigDef()
       //round minimum value down, to make it easier for users.
       .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
       .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
+      .define(ReplicaAlterLogDirsIoMaxBytesPerSecondProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, ReplicaAlterLogDirsIoMaxBytesPerSecondDoc)
 
     def names = brokerConfigDef.names
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 82efaba..3937358 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -178,6 +178,8 @@ object Defaults {
   val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
   val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
   val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
+  val NumAlterLogDirsReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
+  val AlterLogDirsReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
 
   /** ********* Transaction Configuration ***********/
   val TransactionalIdExpirationMsDefault = 604800000
@@ -237,6 +239,7 @@ object KafkaConfig {
   val NumNetworkThreadsProp = "num.network.threads"
   val NumIoThreadsProp = "num.io.threads"
   val BackgroundThreadsProp = "background.threads"
+  val NumReplicaAlterLogDirsThreadsProp = "num.replica.alter.log.dirs.threads"
   val QueuedMaxRequestsProp = "queued.max.requests"
   val QueuedMaxBytesProp = "queued.max.request.bytes"
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
@@ -363,8 +366,10 @@ object KafkaConfig {
   val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
   val NumQuotaSamplesProp = "quota.window.num"
   val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
+  val NumAlterLogDirsReplicationQuotaSamplesProp = "alter.log.dirs.replication.quota.window.num"
   val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
   val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
+  val AlterLogDirsReplicationQuotaWindowSizeSecondsProp = "alter.log.dirs.replication.quota.window.size.seconds"
 
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
@@ -423,6 +428,7 @@ object KafkaConfig {
     s"<p>This can be set per topic with the topic level <code>${TopicConfig.MAX_MESSAGE_BYTES_CONFIG}</code> config.</p>"
   val NumNetworkThreadsDoc = "The number of threads that the server uses for receiving requests from the network and sending responses to the network"
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
+  val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
   val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
   val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
@@ -619,8 +625,10 @@ object KafkaConfig {
   "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
   val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
+  val NumAlterLogDirsReplicationQuotaSamplesDoc = "The number of samples to retain in memory for alter log dirs replication quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
   val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
+  val AlterLogDirsReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for alter log dirs replication quotas"
   /** ********* Transaction Configuration ***********/
   val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
     "transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
@@ -691,6 +699,7 @@ object KafkaConfig {
       .define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), HIGH, MessageMaxBytesDoc)
       .define(NumNetworkThreadsProp, INT, Defaults.NumNetworkThreads, atLeast(1), HIGH, NumNetworkThreadsDoc)
       .define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
+      .define(NumReplicaAlterLogDirsThreadsProp, INT, null, HIGH, NumReplicaAlterLogDirsThreadsDoc)
       .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
       .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
       .define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
@@ -834,8 +843,10 @@ object KafkaConfig {
       .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
       .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
       .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc)
+      .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc)
       .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
+      .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc)
 
       /** ********* SSL Configuration ****************/
       .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
@@ -915,6 +926,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
   val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
 
+  def getNumReplicaAlterLogDirsThreads: Int = {
+    val numThreads: Integer = Option(getInt(KafkaConfig.NumReplicaAlterLogDirsThreadsProp)).getOrElse(logDirs.size)
+    numThreads
+  }
+
   /************* Authorizer Configuration ***********/
   val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)
 
@@ -1076,6 +1092,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
   val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)
   val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
+  val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
+  val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
 
   /** ********* Transaction Configuration **************/
   val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f2d4b16..a8c0a4a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -313,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   }
 
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
-    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,
+    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
       brokerTopicStats, metadataCache, logDirFailureChannel)
 
   private def initZk(): ZkUtils = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index b70f2e5..13cd010 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -28,6 +28,7 @@ object QuotaType  {
   case object Request extends QuotaType
   case object LeaderReplication extends QuotaType
   case object FollowerReplication extends QuotaType
+  case object AlterLogDirsReplication extends QuotaType
 }
 sealed trait QuotaType
 
@@ -38,7 +39,12 @@ object QuotaFactory extends Logging {
     override def isQuotaExceeded(): Boolean = false
   }
 
-  case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+  case class QuotaManagers(fetch: ClientQuotaManager,
+                           produce: ClientQuotaManager,
+                           request: ClientRequestQuotaManager,
+                           leader: ReplicationQuotaManager,
+                           follower: ReplicationQuotaManager,
+                           alterLogDirs: ReplicationQuotaManager) {
     def shutdown() {
       fetch.shutdown
       produce.shutdown
@@ -52,7 +58,8 @@ object QuotaFactory extends Logging {
       new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
       new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
       new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
-      new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
+      new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
+      new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
     )
   }
 
@@ -83,9 +90,18 @@ object QuotaFactory extends Logging {
     )
   }
 
-  def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
+  def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig = {
     ReplicationQuotaManagerConfig(
       numQuotaSamples = cfg.numReplicationQuotaSamples,
       quotaWindowSizeSeconds = cfg.replicationQuotaWindowSizeSeconds
     )
+  }
+
+  def alterLogDirsReplicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig = {
+    ReplicationQuotaManagerConfig(
+      numQuotaSamples = cfg.numAlterLogDirsReplicationQuotaSamples,
+      quotaWindowSizeSeconds = cfg.alterLogDirsReplicationQuotaWindowSizeSeconds
+    )
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
new file mode 100644
index 0000000..bded125
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
@@ -0,0 +1,40 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+
+import kafka.cluster.BrokerEndPoint
+
+class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
+                                 replicaManager: ReplicaManager,
+                                 quotaManager: ReplicationQuotaManager,
+                                 brokerTopicStats: BrokerTopicStats)
+  extends AbstractFetcherManager(s"ReplicaAlterLogDirsManager on broker ${brokerConfig.brokerId}",
+    "ReplicaAlterLogDirs", brokerConfig.getNumReplicaAlterLogDirsThreads) {
+
+  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
+    val threadName = s"ReplicaAlterLogDirsThread-$fetcherId"
+    new ReplicaAlterLogDirsThread(threadName, sourceBroker, brokerConfig, replicaManager, quotaManager, brokerTopicStats)
+  }
+
+  def shutdown() {
+    info("shutting down")
+    closeAllFetchers()
+    info("shutdown completed")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
new file mode 100644
index 0000000..48c83d4
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -0,0 +1,266 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import AbstractFetcherThread.ResultWithPartitions
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
+import ReplicaAlterLogDirsThread.FetchRequest
+import ReplicaAlterLogDirsThread.PartitionData
+import kafka.api.Request
+import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, Seq, Set, mutable}
+
+
+class ReplicaAlterLogDirsThread(name: String,
+                                sourceBroker: BrokerEndPoint,
+                                brokerConfig: KafkaConfig,
+                                replicaMgr: ReplicaManager,
+                                quota: ReplicationQuotaManager,
+                                brokerTopicStats: BrokerTopicStats)
+  extends AbstractFetcherThread(name = name,
+                                clientId = name,
+                                sourceBroker = sourceBroker,
+                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
+                                isInterruptible = false,
+                                includeLogTruncation = true) {
+
+  type REQ = FetchRequest
+  type PD = PartitionData
+
+  private val replicaId = brokerConfig.brokerId
+  private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
+  private val fetchSize = brokerConfig.replicaFetchMaxBytes
+
+  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)
+
+  def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
+    var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
+    val request = fetchRequest.underlying.build()
+
+    def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
+      partitionData = responsePartitionData.map { case (tp, data) =>
+        val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
+        val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+        tp -> new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
+          data.logStartOffset, abortedTransactions, data.records)
+      }
+    }
+
+    replicaMgr.fetchMessages(
+      0L, // timeout is 0 so that the callback will be executed immediately
+      Request.FutureLocalReplicaId,
+      request.minBytes,
+      request.maxBytes,
+      request.version <= 2,
+      request.fetchData.asScala.toSeq,
+      UnboundedQuota,
+      processResponseCallback,
+      request.isolationLevel)
+
+    if (partitionData == null)
+      throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
+
+    partitionData.map { case (key, value) =>
+      key -> new PartitionData(value)
+    }
+  }
+
+  // process fetched data
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
+    val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+    val partition = replicaMgr.getPartition(topicPartition).get
+    val records = partitionData.toRecords
+
+    if (fetchOffset != futureReplica.logEndOffset.messageOffset)
+      throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
+        topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
+
+    // Append the leader's messages to the log
+    partition.appendRecordsToFutureReplica(records)
+    futureReplica.highWatermark = new LogOffsetMetadata(partitionData.highWatermark)
+    futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
+
+    if (partition.maybeReplaceCurrentWithFutureReplica())
+      removePartitions(Set(topicPartition))
+
+    quota.record(records.sizeInBytes)
+  }
+
+  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
+    val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+    val currentReplica = replicaMgr.getReplicaOrException(topicPartition)
+    val partition = replicaMgr.getPartition(topicPartition).get
+    val logEndOffset: Long = currentReplica.logEndOffset.messageOffset
+
+    if (logEndOffset < futureReplica.logEndOffset.messageOffset) {
+      warn("Future replica for partition %s reset its fetch offset from %d to current replica's latest offset %d"
+        .format(topicPartition, futureReplica.logEndOffset.messageOffset, logEndOffset))
+      partition.truncateTo(logEndOffset, isFuture = true)
+      logEndOffset
+    } else {
+      val currentReplicaStartOffset: Long = currentReplica.logStartOffset
+      warn("Future replica for partition %s reset its fetch offset from %d to current replica's start offset %d"
+        .format(topicPartition, futureReplica.logEndOffset.messageOffset, currentReplicaStartOffset))
+      val offsetToFetch = Math.max(currentReplicaStartOffset, futureReplica.logEndOffset.messageOffset)
+      // Only truncate the log when current replica's log start offset is greater than future replica's log end offset.
+      if (currentReplicaStartOffset > futureReplica.logEndOffset.messageOffset)
+        partition.truncateFullyAndStartAt(currentReplicaStartOffset, isFuture = true)
+      offsetToFetch
+    }
+  }
+
+  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
+    if (partitions.nonEmpty)
+      delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
+  }
+
+  def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+    val partitionEpochOpts = allPartitions
+      .filter { case (_, state) => state.isTruncatingLog }
+      .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
+
+    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
+    ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
+  }
+
+  def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+    partitions.map { case (tp, epoch) =>
+      try {
+        tp -> new EpochEndOffset(Errors.NONE, replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch))
+      } catch {
+        case t: Throwable =>
+          warn(s"Error when getting EpochEndOffset for $tp", t)
+          tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET)
+      }
+    }
+  }
+
+  def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+    val partitionsWithError = mutable.Set[TopicPartition]()
+
+    fetchedEpochs.foreach { case (topicPartition, epochOffset) =>
+      try {
+        val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+        val partition = replicaMgr.getPartition(topicPartition).get
+
+        if (epochOffset.hasError) {
+          info(s"Retrying leaderEpoch request for partition $topicPartition as the current replica reported an error: ${epochOffset.error}")
+          partitionsWithError += topicPartition
+        } else {
+          val fetchOffset =
+            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
+              partitionStates.stateValue(topicPartition).fetchOffset
+            else if (epochOffset.endOffset >= futureReplica.logEndOffset.messageOffset)
+              futureReplica.logEndOffset.messageOffset
+            else
+              epochOffset.endOffset
+
+          partition.truncateTo(fetchOffset, isFuture = true)
+          fetchOffsets.put(topicPartition, fetchOffset)
+        }
+      } catch {
+        case e: KafkaStorageException =>
+          info(s"Failed to truncate $topicPartition", e)
+          partitionsWithError += topicPartition
+      }
+    }
+    ResultWithPartitions(fetchOffsets, partitionsWithError)
+  }
+
+  def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
+    // Only include replica in the fetch request if it is not throttled.
+    val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) =>
+      partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
+    }.reduceLeftOption { (left, right) =>
+      if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
+        left
+      else
+        right
+    }
+
+    // Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica
+    // Replicas are ordered by their TopicPartition
+    val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
+    val partitionsWithError = mutable.Set[TopicPartition]()
+
+    if (maxPartitionOpt.nonEmpty) {
+      val (topicPartition, partitionFetchState) = maxPartitionOpt.get
+      try {
+        val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset
+        requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+      } catch {
+        case e: KafkaStorageException =>
+          partitionsWithError += topicPartition
+      }
+    }
+    // Set maxWait and minBytes to 0 because the response should return immediately if
+    // the future log has caught up with the current log of the partition
+    val requestBuilder = JFetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
+    ResultWithPartitions(new FetchRequest(requestBuilder), partitionsWithError)
+  }
+}
+
+object ReplicaAlterLogDirsThread {
+
+  private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
+    def isEmpty: Boolean = underlying.fetchData.isEmpty
+    def offset(topicPartition: TopicPartition): Long = underlying.fetchData.asScala(topicPartition).fetchOffset
+    override def toString = underlying.toString
+  }
+
+  private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+
+    def error = underlying.error
+
+    def toRecords: MemoryRecords = {
+      if (underlying.records == MemoryRecords.EMPTY)
+        underlying.records.asInstanceOf[MemoryRecords]
+      else {
+        val buffer = ByteBuffer.allocate(underlying.records.sizeInBytes())
+        underlying.records.asInstanceOf[FileRecords].readInto(buffer, 0)
+        MemoryRecords.readableRecords(buffer)
+      }
+    }
+
+    def highWatermark: Long = underlying.highWatermark
+
+    def logStartOffset: Long = underlying.logStartOffset
+
+    def exception: Option[Throwable] = error match {
+      case Errors.NONE => None
+      case e => Some(e.exception)
+    }
+
+    override def toString = underlying.toString
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 52c6de3..a8acde4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -87,19 +87,21 @@ class ReplicaFetcherThread(name: String,
   // process fetched data
   def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
+    val partition = replicaMgr.getPartition(topicPartition).get
     val records = partitionData.toRecords
 
     maybeWarnIfOversizedRecords(records, topicPartition)
 
     if (fetchOffset != replica.logEndOffset.messageOffset)
-      throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
+      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
         topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
+
     if (logger.isTraceEnabled)
       trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
         .format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
 
     // Append the leader's messages to the log
-    replica.log.get.appendAsFollower(records)
+    partition.appendRecordsToFollower(records)
 
     if (logger.isTraceEnabled)
       trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
@@ -132,6 +134,7 @@ class ReplicaFetcherThread(name: String,
    */
   def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
+    val partition = replicaMgr.getPartition(topicPartition).get
 
     /**
      * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -159,7 +162,8 @@ class ReplicaFetcherThread(name: String,
 
       warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
         s"leader's latest offset $leaderEndOffset")
-      replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))
+      partition.truncateTo(leaderEndOffset, isFuture = false)
+      replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, topicPartition, leaderEndOffset)
       leaderEndOffset
     } else {
       /**
@@ -189,8 +193,9 @@ class ReplicaFetcherThread(name: String,
         s"leader's start offset $leaderStartOffset")
       val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
       // Only truncate log when current leader's log start offset is greater than follower's log end offset.
-      if (leaderStartOffset > replica.logEndOffset.messageOffset)
-        replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)
+      if (leaderStartOffset > replica.logEndOffset.messageOffset) {
+        partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
+      }
       offsetToFetch
     }
   }
@@ -261,27 +266,31 @@ class ReplicaFetcherThread(name: String,
     * - If the leader replied with undefined epoch offset we must use the high watermark
     */
   override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
-    val truncationPoints = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
     val partitionsWithError = mutable.Set[TopicPartition]()
 
     fetchedEpochs.foreach { case (tp, epochOffset) =>
       try {
         val replica = replicaMgr.getReplicaOrException(tp)
+        val partition = replicaMgr.getPartition(tp).get
 
         if (epochOffset.hasError) {
           info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}")
           partitionsWithError += tp
         } else {
-          val truncationOffset =
-            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
-              highWatermark(replica, epochOffset)
-            else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
+          val fetchOffset =
+            if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
+              warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
+                s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
+              partitionStates.stateValue(tp).fetchOffset
+            } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
               logEndOffset(replica, epochOffset)
             else
               epochOffset.endOffset
 
-          replicaMgr.logManager.truncateTo(Map(tp -> truncationOffset))
-          truncationPoints.put(tp, truncationOffset)
+          partition.truncateTo(fetchOffset, isFuture = false)
+          replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset)
+          fetchOffsets.put(tp, fetchOffset)
         }
       } catch {
         case e: KafkaStorageException =>
@@ -290,7 +299,7 @@ class ReplicaFetcherThread(name: String,
       }
     }
 
-    ResultWithPartitions(truncationPoints, partitionsWithError)
+    ResultWithPartitions(fetchOffsets, partitionsWithError)
   }
 
   override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
@@ -340,12 +349,6 @@ class ReplicaFetcherThread(name: String,
     logEndOffset
   }
 
-  private def highWatermark(replica: Replica, epochOffset: EpochEndOffset): Long = {
-    warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
-      s"High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
-    replica.highWatermark.messageOffset
-  }
-
   /**
    *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
    *  the quota is exceeded and the replica is not in sync.