You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/17 15:35:24 UTC
[3/4] kafka git commit: KAFKA-5163;
Support replicas movement between log directories (KIP-113)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 6da494e..0c92c71 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -70,7 +70,11 @@ class LogManager(logDirs: Seq[File],
val InitialTaskDelayMs = 30 * 1000
private val logCreationOrDeletionLock = new Object
- private val logs = new Pool[TopicPartition, Log]()
+ private val currentLogs = new Pool[TopicPartition, Log]()
+ // Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica
+ // from one log directory to another log directory on the same broker. The directory of the future log will be renamed
+ // to replace the current log of the partition after the future log catches up with the current log
+ private val futureLogs = new Pool[TopicPartition, Log]()
private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
private val _liveLogDirs: ConcurrentLinkedQueue[File] = createAndValidateLogDirs(logDirs, initialOfflineDirs)
@@ -102,7 +106,7 @@ class LogManager(logDirs: Seq[File],
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
- new LogCleaner(cleanerConfig, liveLogDirs, logs, logDirFailureChannel, time = time)
+ new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
else
null
@@ -179,18 +183,30 @@ class LogManager(logDirs: Seq[File],
if (cleaner != null)
cleaner.handleLogDirFailure(dir)
- val offlineTopicPartitions = logs.collect {
+ val offlineCurrentTopicPartitions = currentLogs.collect {
case (tp, log) if log.dir.getParent == dir => tp
}
-
- offlineTopicPartitions.foreach { topicPartition =>
- val removedLog = logs.remove(topicPartition)
+ offlineCurrentTopicPartitions.foreach { topicPartition => {
+ val removedLog = currentLogs.remove(topicPartition)
if (removedLog != null) {
removedLog.closeHandlers()
removedLog.removeLogMetrics()
}
+ }}
+
+ val offlineFutureTopicPartitions = futureLogs.collect {
+ case (tp, log) if log.dir.getParent == dir => tp
}
- info(s"Partitions ${offlineTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
+ offlineFutureTopicPartitions.foreach { topicPartition => {
+ val removedLog = futureLogs.remove(topicPartition)
+ if (removedLog != null) {
+ removedLog.closeHandlers()
+ removedLog.removeLogMetrics()
+ }
+ }}
+
+ info(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
+ s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
dirLocks.filter(_.file.getParent == dir).foreach(dir => CoreUtils.swallow(dir.destroy()))
}
}
@@ -221,7 +237,7 @@ class LogManager(logDirs: Seq[File],
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
- val current = Log(
+ val log = Log(
dir = logDir,
config = config,
logStartOffset = logStartOffset,
@@ -234,13 +250,22 @@ class LogManager(logDirs: Seq[File],
logDirFailureChannel = logDirFailureChannel)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
- this.logsToBeDeleted.add(current)
+ this.logsToBeDeleted.add(log)
} else {
- val previous = this.logs.put(topicPartition, current)
+ val previous = {
+ if (log.isFuture)
+ this.futureLogs.put(topicPartition, log)
+ else
+ this.currentLogs.put(topicPartition, log)
+ }
if (previous != null) {
- throw new IllegalArgumentException(
- "Duplicate log directories found: %s, %s!".format(
- current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ if (log.isFuture)
+ throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+ else
+ throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
+ s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
+ s"replacing current replica with future replica. Recover broker from this failure by manually deleting one of the two directories " +
+ s"for this partition. It is recommended to delete the partition in the log directory that is known to have failed recently.")
}
}
}
@@ -252,7 +277,7 @@ class LogManager(logDirs: Seq[File],
info("Loading logs.")
val startMs = time.milliseconds
val threadPools = ArrayBuffer.empty[ExecutorService]
- val offlineDirs = ArrayBuffer.empty[(String, IOException)]
+ val offlineDirs = mutable.Set.empty[(String, IOException)]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- liveLogDirs) {
@@ -295,7 +320,7 @@ class LogManager(logDirs: Seq[File],
loadLog(logDir, recoveryPoints, logStartOffsets)
} catch {
case e: IOException =>
- offlineDirs.append((dir.getAbsolutePath, e))
+ offlineDirs.add((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e)
}
}
@@ -303,7 +328,7 @@ class LogManager(logDirs: Seq[File],
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
- offlineDirs.append((dir.getAbsolutePath, e))
+ offlineDirs.add((dir.getAbsolutePath, e))
error("Error while loading log dir " + dir.getAbsolutePath, e)
}
}
@@ -315,10 +340,11 @@ class LogManager(logDirs: Seq[File],
cleanShutdownFile.delete()
} catch {
case e: IOException =>
- offlineDirs.append((cleanShutdownFile.getParent, e))
+ offlineDirs.add((cleanShutdownFile.getParent, e))
error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
}
}
+
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
}
@@ -442,24 +468,30 @@ class LogManager(logDirs: Seq[File],
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
* @param partitionOffsets Partition logs that need to be truncated
+ * @param isFuture True iff the truncation should be performed on the future log of the specified partitions
*/
- def truncateTo(partitionOffsets: Map[TopicPartition, Long]) {
+ def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) {
var truncated = false
for ((topicPartition, truncateOffset) <- partitionOffsets) {
- val log = logs.get(topicPartition)
+ val log = {
+ if (isFuture)
+ futureLogs.get(topicPartition)
+ else
+ currentLogs.get(topicPartition)
+ }
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
val needToStopCleaner = cleaner != null && truncateOffset < log.activeSegment.baseOffset
- if (needToStopCleaner)
+ if (needToStopCleaner && !isFuture)
cleaner.abortAndPauseCleaning(topicPartition)
try {
if (log.truncateTo(truncateOffset))
truncated = true
- if (needToStopCleaner)
+ if (needToStopCleaner && !isFuture)
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
} finally {
- if (needToStopCleaner)
+ if (needToStopCleaner && !isFuture)
cleaner.resumeCleaning(topicPartition)
}
}
@@ -470,24 +502,31 @@ class LogManager(logDirs: Seq[File],
}
/**
- * Delete all data in a partition and start the log at the new offset
+ * Delete all data in a partition and start the log at the new offset
*
- * @param newOffset The new offset to start the log with
+ * @param topicPartition The partition whose log needs to be truncated
+ * @param newOffset The new offset to start the log with
+ * @param isFuture True iff the truncation should be performed on the future log of the specified partition
*/
- def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long) {
- val log = logs.get(topicPartition)
+ def truncateFullyAndStartAt(topicPartition: TopicPartition, newOffset: Long, isFuture: Boolean) {
+ val log = {
+ if (isFuture)
+ futureLogs.get(topicPartition)
+ else
+ currentLogs.get(topicPartition)
+ }
// If the log does not exist, skip it
if (log != null) {
//Abort and pause the cleaning of the log, and resume after truncation is done.
- if (cleaner != null)
+ if (cleaner != null && !isFuture)
cleaner.abortAndPauseCleaning(topicPartition)
log.truncateFullyAndStartAt(newOffset)
- if (cleaner != null) {
+ if (cleaner != null && !isFuture) {
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
cleaner.resumeCleaning(topicPartition)
}
+ checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
}
- checkpointLogRecoveryOffsets()
}
/**
@@ -516,7 +555,7 @@ class LogManager(logDirs: Seq[File],
} {
try {
checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
- logs.values.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
+ allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
@@ -545,33 +584,61 @@ class LogManager(logDirs: Seq[File],
}
}
- def updatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
- // The logDir should be an absolute path
- preferredLogDirs.put(topicPartition, logDir)
+ // The logDir should be an absolute path
+ def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
+ // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
+ if (!getLog(topicPartition).exists(_.dir.getParent == logDir) &&
+ !getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir))
+ preferredLogDirs.put(topicPartition, logDir)
+ }
+
+ def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
+ if (cleaner != null)
+ cleaner.abortAndPauseCleaning(topicPartition)
}
+
/**
* Get the log if it exists, otherwise return None
+ *
+ * @param topicPartition the partition of the log
+ * @param isFuture True iff the future log of the specified partition should be returned
*/
- def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
+ def getLog(topicPartition: TopicPartition, isFuture: Boolean = false): Option[Log] = {
+ if (isFuture)
+ Option(futureLogs.get(topicPartition))
+ else
+ Option(currentLogs.get(topicPartition))
+ }
/**
* If the log already exists, just return a copy of the existing log
* Otherwise if isNew=true or if there is no offline log directory, create a log for the given topic and the given partition
* Otherwise throw KafkaStorageException
*
+ * @param topicPartition The partition whose log needs to be returned or created
+ * @param config The configuration of the log that should be applied for log creation
* @param isNew Whether the replica should have existed on the broker or not
+ * @param isFuture True iff the future log of the specified partition should be returned or created
* @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
*/
- def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false): Log = {
+ def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {
logCreationOrDeletionLock synchronized {
- getLog(topicPartition).getOrElse {
+ getLog(topicPartition, isFuture).getOrElse {
// create the log if it has not already been created in another thread
if (!isNew && offlineLogDirs.nonEmpty)
throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")
val logDir = {
val preferredLogDir = preferredLogDirs.get(topicPartition)
+
+ if (isFuture) {
+ if (preferredLogDir == null)
+ throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
+ else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)
+ throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
+ }
+
if (preferredLogDir != null)
preferredLogDir
else
@@ -581,7 +648,12 @@ class LogManager(logDirs: Seq[File],
throw new KafkaStorageException(s"Can not create log for $topicPartition because log directory $logDir is offline")
try {
- val dir = new File(logDir, topicPartition.topic + "-" + topicPartition.partition)
+ val dir = {
+ if (isFuture)
+ new File(logDir, Log.logFutureDirName(topicPartition))
+ else
+ new File(logDir, Log.logDirName(topicPartition))
+ }
Files.createDirectories(dir.toPath)
val log = Log(
@@ -596,7 +668,10 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats = brokerTopicStats,
logDirFailureChannel = logDirFailureChannel)
- logs.put(topicPartition, log)
+ if (isFuture)
+ futureLogs.put(topicPartition, log)
+ else
+ currentLogs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
@@ -641,44 +716,78 @@ class LogManager(logDirs: Seq[File],
}
/**
+ * Mark the partition directory in the source log directory for deletion and
+ * rename the future log of this partition in the destination log directory to be the current log
+ *
+ * @param topicPartition TopicPartition that needs to be swapped
+ */
+ def replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit = {
+ logCreationOrDeletionLock synchronized {
+ val sourceLog = currentLogs.get(topicPartition)
+ val destLog = futureLogs.get(topicPartition)
+
+ if (sourceLog == null)
+ throw new KafkaStorageException(s"The current replica for $topicPartition is offline")
+ if (destLog == null)
+ throw new KafkaStorageException(s"The future replica for $topicPartition is offline")
+
+ destLog.renameDir(Log.logDirName(topicPartition))
+ // Now that future replica has been successfully renamed to be the current replica
+ // Update the cached map and log cleaner as appropriate.
+ futureLogs.remove(topicPartition)
+ currentLogs.put(topicPartition, destLog)
+ if (cleaner != null) {
+ cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile)
+ cleaner.resumeCleaning(topicPartition)
+ }
+
+ try {
+ sourceLog.renameDir(Log.logDeleteDirName(topicPartition))
+ // Now that replica in source log directory has been successfully renamed for deletion.
+ // Close the log, update checkpoint files, and enqueue this log to be deleted.
+ sourceLog.close()
+ checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
+ checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
+ logsToBeDeleted.add(sourceLog)
+ } catch {
+ case e: KafkaStorageException =>
+ // If sourceLog's log directory is offline, we need close its handlers here.
+ // handleLogDirFailure() will not close handlers of sourceLog because it has been removed from currentLogs map
+ sourceLog.closeHandlers()
+ sourceLog.removeLogMetrics()
+ throw e
+ }
+
+ info(s"The current replica is successfully replaced with the future replica for $topicPartition")
+ }
+ }
+
+ /**
* Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
* add it in the queue for deletion.
*
* @param topicPartition TopicPartition that needs to be deleted
+ * @param isFuture True iff the future log of the specified partition should be deleted
* @return the removed log
*/
- def asyncDelete(topicPartition: TopicPartition): Log = {
+ def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
- logs.remove(topicPartition)
+ if (isFuture)
+ futureLogs.remove(topicPartition)
+ else
+ currentLogs.remove(topicPartition)
}
if (removedLog != null) {
- try {
- //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
- if (cleaner != null) {
- cleaner.abortCleaning(topicPartition)
- cleaner.updateCheckpoints(removedLog.dir.getParentFile)
- }
- val dirName = Log.logDeleteDirName(removedLog.name)
- removedLog.close()
- val renamedDir = new File(removedLog.dir.getParent, dirName)
- val renameSuccessful = removedLog.dir.renameTo(renamedDir)
- if (renameSuccessful) {
- checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
- removedLog.dir = renamedDir
- // change the file pointers for log and index file
- removedLog.logSegments.foreach(_.updateDir(renamedDir))
- logsToBeDeleted.add(removedLog)
- removedLog.removeLogMetrics()
- info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
- } else {
- throw new IOException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
- }
- } catch {
- case e: IOException =>
- val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}."
- logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e)
- throw new KafkaStorageException(msg, e)
+ //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+ if (cleaner != null && !isFuture) {
+ cleaner.abortCleaning(topicPartition)
+ cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
+ removedLog.renameDir(Log.logDeleteDirName(topicPartition))
+ checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
+ checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
+ logsToBeDeleted.add(removedLog)
+ info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else if (offlineLogDirs.nonEmpty) {
throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(","))
}
@@ -724,18 +833,21 @@ class LogManager(logDirs: Seq[File],
/**
* Get all the partition logs
*/
- def allLogs: Iterable[Log] = logs.values
+ def allLogs: Iterable[Log] = currentLogs.values ++ futureLogs.values
- /**
- * Get a map of TopicPartition => Log
- */
- def logsByTopicPartition: Map[TopicPartition, Log] = logs.toMap
+ def logsByTopic(topic: String): Seq[Log] = {
+ (currentLogs.toList ++ futureLogs.toList).filter { case (topicPartition, log) =>
+ topicPartition.topic() == topic
+ }.map { case (topicPartition, log) => log }
+ }
/**
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
- this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent }
+ (this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
+ case (_, log) => log.dir.getParent
+ }.mapValues(_.toMap)
}
// logDir should be an absolute path
@@ -753,7 +865,7 @@ class LogManager(logDirs: Seq[File],
private def flushDirtyLogs(): Unit = {
debug("Checking for dirty logs to flush...")
- for ((topicPartition, log) <- logs) {
+ for ((topicPartition, log) <- currentLogs.toList ++ futureLogs.toList) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicPartition.topic + " flush interval " + log.config.flushMs +
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 63c1f56..da84b19 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -457,7 +457,7 @@ object ProducerStateManager {
*/
@nonthreadsafe
class ProducerStateManager(val topicPartition: TopicPartition,
- val logDir: File,
+ @volatile var logDir: File,
val maxProducerIdExpirationMs: Int = 60 * 60 * 1000) extends Logging {
import ProducerStateManager._
import java.util
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index a8316b4..c385d4f 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -69,13 +69,24 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
+ // This method is only needed by ReplicaAlterDirManager
+ def markPartitionsForTruncation(brokerId: Int, topicPartition: TopicPartition, truncationOffset: Long) {
+ mapLock synchronized {
+ val fetcherId = getFetcherId(topicPartition.topic, topicPartition.partition)
+ val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerId, fetcherId)
+ fetcherThreadMap.get(brokerIdAndFetcherId).foreach { thread =>
+ thread.markPartitionsForTruncation(topicPartition, truncationOffset)
+ }
+ }
+ }
+
// to be defined in subclass to create a specific fetcher
def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
- val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialOffset) =>
- BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
+ val partitionsPerFetcher = partitionAndOffsets.groupBy { case(topicPartition, brokerAndInitialFetchOffset) =>
+ BrokerAndFetcherId(brokerAndInitialFetchOffset.broker, getFetcherId(topicPartition.topic, topicPartition.partition))}
def addAndStartFetcherThread(brokerAndFetcherId: BrokerAndFetcherId, brokerIdAndFetcherId: BrokerIdAndFetcherId) {
val fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
@@ -83,7 +94,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
fetcherThread.start
}
- for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
+ for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
fetcherThreadMap.get(brokerIdAndFetcherId) match {
case Some(f) if f.sourceBroker.host == brokerAndFetcherId.broker.host && f.sourceBroker.port == brokerAndFetcherId.broker.port =>
@@ -95,7 +106,7 @@ abstract class AbstractFetcherManager(protected val name: String, clientId: Stri
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
- fetcherThreadMap(brokerIdAndFetcherId).addPartitions(partitionAndOffsets.map { case (tp, brokerAndInitOffset) =>
+ fetcherThreadMap(brokerIdAndFetcherId).addPartitions(initialFetchOffsets.map { case (tp, brokerAndInitOffset) =>
tp -> brokerAndInitOffset.initOffset
})
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index ddf6855..b078073 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -21,11 +21,10 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.BrokerEndPoint
import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
-import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
import kafka.common.{ClientIdAndBroker, KafkaException}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
-import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
@@ -84,7 +83,7 @@ abstract class AbstractFetcherThread(name: String,
protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
- override def shutdown(){
+ override def shutdown() {
initiateShutdown()
inLock(partitionMapLock) {
partitionMapCond.signalAll()
@@ -132,9 +131,9 @@ abstract class AbstractFetcherThread(name: String,
inLock(partitionMapLock) {
//Check no leadership changes happened whilst we were unlocked, fetching epochs
val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
- val ResultWithPartitions(truncationPoints, partitionsWithError) = maybeTruncate(leaderEpochs)
+ val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs)
handlePartitionsWithErrors(partitionsWithError)
- markTruncationComplete(truncationPoints)
+ markTruncationCompleteAndUpdateFetchOffset(fetchOffsets)
}
}
}
@@ -170,8 +169,10 @@ abstract class AbstractFetcherThread(name: String,
val topic = topicPartition.topic
val partitionId = topicPartition.partition
Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
- // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
- if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
+ // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
+ // In this case, we only want to process the fetch response if the partition state is ready for fetch and the current offset is the same as the offset requested.
+ if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset &&
+ currentPartitionFetchState.isReadyForFetch) {
partitionData.error match {
case Errors.NONE =>
try {
@@ -184,7 +185,8 @@ abstract class AbstractFetcherThread(name: String,
processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
val validBytes = records.validBytes
- if (validBytes > 0) {
+ // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data
+ if (validBytes > 0 && partitionStates.contains(topicPartition)) {
// Update partitionStates only if there is no exception during processPartitionData
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
fetcherStats.byteRate.mark(validBytes)
@@ -195,10 +197,10 @@ abstract class AbstractFetcherThread(name: String,
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
- logger.error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset} error ${ime.getMessage}")
+ error(s"Found invalid messages during fetch for partition $topicPartition offset ${currentPartitionFetchState.fetchOffset}", ime)
partitionsWithError += topicPartition
case e: KafkaStorageException =>
- logger.error(s"Error while processing data for partition $topicPartition", e)
+ error(s"Error while processing data for partition $topicPartition", e)
partitionsWithError += topicPartition
case e: Throwable =>
throw new KafkaException(s"Error processing data for partition $topicPartition " +
@@ -212,12 +214,12 @@ abstract class AbstractFetcherThread(name: String,
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
- error(s"Error getting offset for partition $topicPartition to broker ${sourceBroker.id}", e)
+ error(s"Error getting offset for partition $topicPartition from broker ${sourceBroker.id}", e)
partitionsWithError += topicPartition
}
case _ =>
if (isRunning.get) {
- error(s"Error for partition $topicPartition to broker ${sourceBroker.id}:${partitionData.exception.get}")
+ error(s"Error for partition $topicPartition from broker ${sourceBroker.id}", partitionData.exception.get)
partitionsWithError += topicPartition
}
}
@@ -231,18 +233,31 @@ abstract class AbstractFetcherThread(name: String,
handlePartitionsWithErrors(partitionsWithError)
}
- def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) {
+ def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long) {
+ if (!includeLogTruncation)
+ throw new IllegalStateException("Truncation should not be requested if includeLogTruncation is disabled")
+ partitionMapLock.lockInterruptibly()
+ try {
+ Option(partitionStates.stateValue(topicPartition)).foreach { state =>
+ val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset), state.delay, truncatingLog = true)
+ partitionStates.updateAndMoveToEnd(topicPartition, newState)
+ }
+ partitionMapCond.signalAll()
+ } finally partitionMapLock.unlock()
+ }
+
+ def addPartitions(initialFetchOffsets: Map[TopicPartition, Long]) {
partitionMapLock.lockInterruptibly()
try {
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
- val newPartitionToState = partitionAndOffsets.filter { case (tp, _) =>
+ val newPartitionToState = initialFetchOffsets.filter { case (tp, _) =>
!partitionStates.contains(tp)
- }.map { case (tp, offset) =>
+ }.map { case (tp, initialFetchOffset) =>
val fetchState =
- if (offset < 0)
+ if (initialFetchOffset < 0)
new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
else
- new PartitionFetchState(offset, includeLogTruncation)
+ new PartitionFetchState(initialFetchOffset, includeLogTruncation)
tp -> fetchState
}
val existingPartitionToState = states().toMap
@@ -252,14 +267,14 @@ abstract class AbstractFetcherThread(name: String,
}
/**
- * Loop through all partitions, marking them as truncation complete and applying the correct offset
+ * Loop through all partitions, marking them as truncation complete and update the fetch offset
*
- * @param partitions the partitions to mark truncation complete
+ * @param fetchOffsets the partitions to mark truncation complete
*/
- private def markTruncationComplete(partitions: Map[TopicPartition, Long]) {
+ private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, Long]) {
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
.map { state =>
- val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
+ val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match {
case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false)
case None => state.value()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ddeecb0..0edd638 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -52,7 +52,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
// Validate the configurations.
val configNamesToExclude = excludedConfigs(topic, topicConfig)
- val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
+ val logs = logManager.logsByTopic(topic).toBuffer
if (logs.nonEmpty) {
/* combine the default properties with the overrides in zk to create the new LogConfig */
val props = new Properties()
@@ -61,10 +61,10 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
if (!configNamesToExclude.contains(key)) props.put(key, value)
}
val logConfig = LogConfig(props)
- if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
+ if ((topicConfig.containsKey(LogConfig.RetentionMsProp)
|| topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
&& logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
- warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
+ warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}. It is smaller than " +
s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}. " +
s"This may result in frequent log rolling.")
logs.foreach(_.config = logConfig)
@@ -97,7 +97,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC
.map(_ (0).toInt).toSeq //convert to list of partition ids
}
}
-
+
def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
// Verify message format version
Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString =>
@@ -189,6 +189,7 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quo
if (brokerConfig.brokerId == brokerId.trim.toInt) {
quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
+ quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/DynamicConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 175bf62..ddfdff8 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -36,6 +36,7 @@ object DynamicConfig {
//Properties
val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate"
val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate"
+ val ReplicaAlterLogDirsIoMaxBytesPerSecondProp = "replica.alter.log.dirs.io.max.bytes.per.second"
//Defaults
val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
@@ -47,12 +48,15 @@ object DynamicConfig {
val FollowerReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for followers enumerated in the " +
s"property ${LogConfig.FollowerReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " +
s"limit be kept above 1MB/s for accurate behaviour."
+ val ReplicaAlterLogDirsIoMaxBytesPerSecondDoc = "A long representing the upper bound (bytes/sec) on disk IO used for moving replica between log directories on the same broker. " +
+ s"This property can be only set dynamically. It is suggested that the limit be kept above 1MB/s for accurate behaviour."
//Definitions
private val brokerConfigDef = new ConfigDef()
//round minimum value down, to make it easier for users.
.define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
.define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
+ .define(ReplicaAlterLogDirsIoMaxBytesPerSecondProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, ReplicaAlterLogDirsIoMaxBytesPerSecondDoc)
def names = brokerConfigDef.names
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 82efaba..3937358 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -178,6 +178,8 @@ object Defaults {
val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
val ReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
+ val NumAlterLogDirsReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
+ val AlterLogDirsReplicationQuotaWindowSizeSeconds: Int = ReplicationQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
/** ********* Transaction Configuration ***********/
val TransactionalIdExpirationMsDefault = 604800000
@@ -237,6 +239,7 @@ object KafkaConfig {
val NumNetworkThreadsProp = "num.network.threads"
val NumIoThreadsProp = "num.io.threads"
val BackgroundThreadsProp = "background.threads"
+ val NumReplicaAlterLogDirsThreadsProp = "num.replica.alter.log.dirs.threads"
val QueuedMaxRequestsProp = "queued.max.requests"
val QueuedMaxBytesProp = "queued.max.request.bytes"
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
@@ -363,8 +366,10 @@ object KafkaConfig {
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
val NumQuotaSamplesProp = "quota.window.num"
val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
+ val NumAlterLogDirsReplicationQuotaSamplesProp = "alter.log.dirs.replication.quota.window.num"
val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
val ReplicationQuotaWindowSizeSecondsProp = "replication.quota.window.size.seconds"
+ val AlterLogDirsReplicationQuotaWindowSizeSecondsProp = "alter.log.dirs.replication.quota.window.size.seconds"
val DeleteTopicEnableProp = "delete.topic.enable"
val CompressionTypeProp = "compression.type"
@@ -423,6 +428,7 @@ object KafkaConfig {
s"<p>This can be set per topic with the topic level <code>${TopicConfig.MAX_MESSAGE_BYTES_CONFIG}</code> config.</p>"
val NumNetworkThreadsDoc = "The number of threads that the server uses for receiving requests from the network and sending responses to the network"
val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
+ val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
@@ -619,8 +625,10 @@ object KafkaConfig {
"Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
+ val NumAlterLogDirsReplicationQuotaSamplesDoc = "The number of samples to retain in memory for alter log dirs replication quotas"
val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas"
val ReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for replication quotas"
+ val AlterLogDirsReplicationQuotaWindowSizeSecondsDoc = "The time span of each sample for alter log dirs replication quotas"
/** ********* Transaction Configuration ***********/
val TransactionIdExpirationMsDoc = "The maximum time of inactivity before a transactional id is expired by the " +
"transaction coordinator. Note that this also influences producer id expiration: Producer ids are guaranteed to expire " +
@@ -691,6 +699,7 @@ object KafkaConfig {
.define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), HIGH, MessageMaxBytesDoc)
.define(NumNetworkThreadsProp, INT, Defaults.NumNetworkThreads, atLeast(1), HIGH, NumNetworkThreadsDoc)
.define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
+ .define(NumReplicaAlterLogDirsThreadsProp, INT, null, HIGH, NumReplicaAlterLogDirsThreadsDoc)
.define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
.define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
.define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
@@ -834,8 +843,10 @@ object KafkaConfig {
.define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
.define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
.define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc)
+ .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc)
.define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc)
.define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc)
+ .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc)
/** ********* SSL Configuration ****************/
.define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc)
@@ -915,6 +926,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
+ def getNumReplicaAlterLogDirsThreads: Int = {
+ val numThreads: Integer = Option(getInt(KafkaConfig.NumReplicaAlterLogDirsThreadsProp)).getOrElse(logDirs.size)
+ numThreads
+ }
+
/************* Authorizer Configuration ***********/
val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)
@@ -1076,6 +1092,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)
val replicationQuotaWindowSizeSeconds = getInt(KafkaConfig.ReplicationQuotaWindowSizeSecondsProp)
+ val numAlterLogDirsReplicationQuotaSamples = getInt(KafkaConfig.NumAlterLogDirsReplicationQuotaSamplesProp)
+ val alterLogDirsReplicationQuotaWindowSizeSeconds = getInt(KafkaConfig.AlterLogDirsReplicationQuotaWindowSizeSecondsProp)
/** ********* Transaction Configuration **************/
val transactionIdExpirationMs = getInt(KafkaConfig.TransactionalIdExpirationMsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f2d4b16..a8c0a4a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -313,7 +313,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
- new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower,
+ new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel)
private def initZk(): ZkUtils = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/QuotaFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala
index b70f2e5..13cd010 100644
--- a/core/src/main/scala/kafka/server/QuotaFactory.scala
+++ b/core/src/main/scala/kafka/server/QuotaFactory.scala
@@ -28,6 +28,7 @@ object QuotaType {
case object Request extends QuotaType
case object LeaderReplication extends QuotaType
case object FollowerReplication extends QuotaType
+ case object AlterLogDirsReplication extends QuotaType
}
sealed trait QuotaType
@@ -38,7 +39,12 @@ object QuotaFactory extends Logging {
override def isQuotaExceeded(): Boolean = false
}
- case class QuotaManagers(fetch: ClientQuotaManager, produce: ClientQuotaManager, request: ClientRequestQuotaManager, leader: ReplicationQuotaManager, follower: ReplicationQuotaManager) {
+ case class QuotaManagers(fetch: ClientQuotaManager,
+ produce: ClientQuotaManager,
+ request: ClientRequestQuotaManager,
+ leader: ReplicationQuotaManager,
+ follower: ReplicationQuotaManager,
+ alterLogDirs: ReplicationQuotaManager) {
def shutdown() {
fetch.shutdown
produce.shutdown
@@ -52,7 +58,8 @@ object QuotaFactory extends Logging {
new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time),
new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
- new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time)
+ new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
+ new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time)
)
}
@@ -83,9 +90,18 @@ object QuotaFactory extends Logging {
)
}
- def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig =
+ def replicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig = {
ReplicationQuotaManagerConfig(
numQuotaSamples = cfg.numReplicationQuotaSamples,
quotaWindowSizeSeconds = cfg.replicationQuotaWindowSizeSeconds
)
+ }
+
+ def alterLogDirsReplicationConfig(cfg: KafkaConfig): ReplicationQuotaManagerConfig = {
+ ReplicationQuotaManagerConfig(
+ numQuotaSamples = cfg.numAlterLogDirsReplicationQuotaSamples,
+ quotaWindowSizeSeconds = cfg.alterLogDirsReplicationQuotaWindowSizeSeconds
+ )
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
new file mode 100644
index 0000000..bded125
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsManager.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import kafka.cluster.BrokerEndPoint
+
+class ReplicaAlterLogDirsManager(brokerConfig: KafkaConfig,
+ replicaManager: ReplicaManager,
+ quotaManager: ReplicationQuotaManager,
+ brokerTopicStats: BrokerTopicStats)
+ extends AbstractFetcherManager(s"ReplicaAlterLogDirsManager on broker ${brokerConfig.brokerId}",
+ "ReplicaAlterLogDirs", brokerConfig.getNumReplicaAlterLogDirsThreads) {
+
+ override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
+ val threadName = s"ReplicaAlterLogDirsThread-$fetcherId"
+ new ReplicaAlterLogDirsThread(threadName, sourceBroker, brokerConfig, replicaManager, quotaManager, brokerTopicStats)
+ }
+
+ def shutdown() {
+ info("shutting down")
+ closeAllFetchers()
+ info("shutdown completed")
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
new file mode 100644
index 0000000..48c83d4
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.ByteBuffer
+import java.util
+
+import AbstractFetcherThread.ResultWithPartitions
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest => JFetchRequest}
+import ReplicaAlterLogDirsThread.FetchRequest
+import ReplicaAlterLogDirsThread.PartitionData
+import kafka.api.Request
+import kafka.server.QuotaFactory.UnboundedQuota
+import kafka.server.epoch.LeaderEpochCache
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, Seq, Set, mutable}
+
+
+class ReplicaAlterLogDirsThread(name: String,
+ sourceBroker: BrokerEndPoint,
+ brokerConfig: KafkaConfig,
+ replicaMgr: ReplicaManager,
+ quota: ReplicationQuotaManager,
+ brokerTopicStats: BrokerTopicStats)
+ extends AbstractFetcherThread(name = name,
+ clientId = name,
+ sourceBroker = sourceBroker,
+ fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
+ isInterruptible = false,
+ includeLogTruncation = true) {
+
+ type REQ = FetchRequest
+ type PD = PartitionData
+
+ private val replicaId = brokerConfig.brokerId
+ private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
+ private val fetchSize = brokerConfig.replicaFetchMaxBytes
+
+ private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get)
+
+ def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
+ var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null
+ val request = fetchRequest.underlying.build()
+
+ def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]) {
+ partitionData = responsePartitionData.map { case (tp, data) =>
+ val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
+ val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+ tp -> new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
+ data.logStartOffset, abortedTransactions, data.records)
+ }
+ }
+
+ replicaMgr.fetchMessages(
+ 0L, // timeout is 0 so that the callback will be executed immediately
+ Request.FutureLocalReplicaId,
+ request.minBytes,
+ request.maxBytes,
+ request.version <= 2,
+ request.fetchData.asScala.toSeq,
+ UnboundedQuota,
+ processResponseCallback,
+ request.isolationLevel)
+
+ if (partitionData == null)
+ throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
+
+ partitionData.map { case (key, value) =>
+ key -> new PartitionData(value)
+ }
+ }
+
+ // process fetched data
+ def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
+ val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+ val partition = replicaMgr.getPartition(topicPartition).get
+ val records = partitionData.toRecords
+
+ if (fetchOffset != futureReplica.logEndOffset.messageOffset)
+ throw new IllegalStateException("Offset mismatch for the future replica %s: fetched offset = %d, log end offset = %d.".format(
+ topicPartition, fetchOffset, futureReplica.logEndOffset.messageOffset))
+
+ // Append the leader's messages to the log
+ partition.appendRecordsToFutureReplica(records)
+ futureReplica.highWatermark = new LogOffsetMetadata(partitionData.highWatermark)
+ futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
+
+ if (partition.maybeReplaceCurrentWithFutureReplica())
+ removePartitions(Set(topicPartition))
+
+ quota.record(records.sizeInBytes)
+ }
+
+ def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
+ val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+ val currentReplica = replicaMgr.getReplicaOrException(topicPartition)
+ val partition = replicaMgr.getPartition(topicPartition).get
+ val logEndOffset: Long = currentReplica.logEndOffset.messageOffset
+
+ if (logEndOffset < futureReplica.logEndOffset.messageOffset) {
+ warn("Future replica for partition %s reset its fetch offset from %d to current replica's latest offset %d"
+ .format(topicPartition, futureReplica.logEndOffset.messageOffset, logEndOffset))
+ partition.truncateTo(logEndOffset, isFuture = true)
+ logEndOffset
+ } else {
+ val currentReplicaStartOffset: Long = currentReplica.logStartOffset
+ warn("Future replica for partition %s reset its fetch offset from %d to current replica's start offset %d"
+ .format(topicPartition, futureReplica.logEndOffset.messageOffset, currentReplicaStartOffset))
+ val offsetToFetch = Math.max(currentReplicaStartOffset, futureReplica.logEndOffset.messageOffset)
+ // Only truncate the log when current replica's log start offset is greater than future replica's log end offset.
+ if (currentReplicaStartOffset > futureReplica.logEndOffset.messageOffset)
+ partition.truncateFullyAndStartAt(currentReplicaStartOffset, isFuture = true)
+ offsetToFetch
+ }
+ }
+
+ def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
+ if (partitions.nonEmpty)
+ delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
+ }
+
+ def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
+ val partitionEpochOpts = allPartitions
+ .filter { case (_, state) => state.isTruncatingLog }
+ .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap
+
+ val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty }
+
+ val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
+ ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
+ }
+
+ def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
+ partitions.map { case (tp, epoch) =>
+ try {
+ tp -> new EpochEndOffset(Errors.NONE, replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch))
+ } catch {
+ case t: Throwable =>
+ warn(s"Error when getting EpochEndOffset for $tp", t)
+ tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET)
+ }
+ }
+ }
+
+ def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
+ val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+ val partitionsWithError = mutable.Set[TopicPartition]()
+
+ fetchedEpochs.foreach { case (topicPartition, epochOffset) =>
+ try {
+ val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
+ val partition = replicaMgr.getPartition(topicPartition).get
+
+ if (epochOffset.hasError) {
+ info(s"Retrying leaderEpoch request for partition $topicPartition as the current replica reported an error: ${epochOffset.error}")
+ partitionsWithError += topicPartition
+ } else {
+ val fetchOffset =
+ if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
+ partitionStates.stateValue(topicPartition).fetchOffset
+ else if (epochOffset.endOffset >= futureReplica.logEndOffset.messageOffset)
+ futureReplica.logEndOffset.messageOffset
+ else
+ epochOffset.endOffset
+
+ partition.truncateTo(fetchOffset, isFuture = true)
+ fetchOffsets.put(topicPartition, fetchOffset)
+ }
+ } catch {
+ case e: KafkaStorageException =>
+ info(s"Failed to truncate $topicPartition", e)
+ partitionsWithError += topicPartition
+ }
+ }
+ ResultWithPartitions(fetchOffsets, partitionsWithError)
+ }
+
+ def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
+ // Only include replica in the fetch request if it is not throttled.
+ val maxPartitionOpt = partitionMap.filter { case (topicPartition, partitionFetchState) =>
+ partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
+ }.reduceLeftOption { (left, right) =>
+ if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
+ left
+ else
+ right
+ }
+
+ // Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica
+ // Replicas are ordered by their TopicPartition
+ val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData]
+ val partitionsWithError = mutable.Set[TopicPartition]()
+
+ if (maxPartitionOpt.nonEmpty) {
+ val (topicPartition, partitionFetchState) = maxPartitionOpt.get
+ try {
+ val logStartOffset = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId).logStartOffset
+ requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+ } catch {
+ case e: KafkaStorageException =>
+ partitionsWithError += topicPartition
+ }
+ }
+ // Set maxWait and minBytes to 0 because the response should return immediately if
+ // the future log has caught up with the current log of the partition
+ val requestBuilder = JFetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
+ ResultWithPartitions(new FetchRequest(requestBuilder), partitionsWithError)
+ }
+}
+
+object ReplicaAlterLogDirsThread {
+
+ private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
+ def isEmpty: Boolean = underlying.fetchData.isEmpty
+ def offset(topicPartition: TopicPartition): Long = underlying.fetchData.asScala(topicPartition).fetchOffset
+ override def toString = underlying.toString
+ }
+
+ private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
+
+ def error = underlying.error
+
+ def toRecords: MemoryRecords = {
+ if (underlying.records == MemoryRecords.EMPTY)
+ underlying.records.asInstanceOf[MemoryRecords]
+ else {
+ val buffer = ByteBuffer.allocate(underlying.records.sizeInBytes())
+ underlying.records.asInstanceOf[FileRecords].readInto(buffer, 0)
+ MemoryRecords.readableRecords(buffer)
+ }
+ }
+
+ def highWatermark: Long = underlying.highWatermark
+
+ def logStartOffset: Long = underlying.logStartOffset
+
+ def exception: Option[Throwable] = error match {
+ case Errors.NONE => None
+ case e => Some(e.exception)
+ }
+
+ override def toString = underlying.toString
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5cc162f3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 52c6de3..a8acde4 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -87,19 +87,21 @@ class ReplicaFetcherThread(name: String,
// process fetched data
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
val replica = replicaMgr.getReplicaOrException(topicPartition)
+ val partition = replicaMgr.getPartition(topicPartition).get
val records = partitionData.toRecords
maybeWarnIfOversizedRecords(records, topicPartition)
if (fetchOffset != replica.logEndOffset.messageOffset)
- throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
+ throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, replica.logEndOffset.messageOffset))
+
if (logger.isTraceEnabled)
trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
// Append the leader's messages to the log
- replica.log.get.appendAsFollower(records)
+ partition.appendRecordsToFollower(records)
if (logger.isTraceEnabled)
trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
@@ -132,6 +134,7 @@ class ReplicaFetcherThread(name: String,
*/
def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
val replica = replicaMgr.getReplicaOrException(topicPartition)
+ val partition = replicaMgr.getPartition(topicPartition).get
/**
* Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
@@ -159,7 +162,8 @@ class ReplicaFetcherThread(name: String,
warn(s"Reset fetch offset for partition $topicPartition from ${replica.logEndOffset.messageOffset} to current " +
s"leader's latest offset $leaderEndOffset")
- replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))
+ partition.truncateTo(leaderEndOffset, isFuture = false)
+ replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, topicPartition, leaderEndOffset)
leaderEndOffset
} else {
/**
@@ -189,8 +193,9 @@ class ReplicaFetcherThread(name: String,
s"leader's start offset $leaderStartOffset")
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
- if (leaderStartOffset > replica.logEndOffset.messageOffset)
- replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)
+ if (leaderStartOffset > replica.logEndOffset.messageOffset) {
+ partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
+ }
offsetToFetch
}
}
@@ -261,27 +266,31 @@ class ReplicaFetcherThread(name: String,
* - If the leader replied with undefined epoch offset we must use the high watermark
*/
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
- val truncationPoints = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
+ val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionsWithError = mutable.Set[TopicPartition]()
fetchedEpochs.foreach { case (tp, epochOffset) =>
try {
val replica = replicaMgr.getReplicaOrException(tp)
+ val partition = replicaMgr.getPartition(tp).get
if (epochOffset.hasError) {
info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}")
partitionsWithError += tp
} else {
- val truncationOffset =
- if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
- highWatermark(replica, epochOffset)
- else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
+ val fetchOffset =
+ if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
+ warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
+ s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
+ partitionStates.stateValue(tp).fetchOffset
+ } else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
logEndOffset(replica, epochOffset)
else
epochOffset.endOffset
- replicaMgr.logManager.truncateTo(Map(tp -> truncationOffset))
- truncationPoints.put(tp, truncationOffset)
+ partition.truncateTo(fetchOffset, isFuture = false)
+ replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset)
+ fetchOffsets.put(tp, fetchOffset)
}
} catch {
case e: KafkaStorageException =>
@@ -290,7 +299,7 @@ class ReplicaFetcherThread(name: String,
}
}
- ResultWithPartitions(truncationPoints, partitionsWithError)
+ ResultWithPartitions(fetchOffsets, partitionsWithError)
}
override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
@@ -340,12 +349,6 @@ class ReplicaFetcherThread(name: String,
logEndOffset
}
- private def highWatermark(replica: Replica, epochOffset: EpochEndOffset): Long = {
- warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
- s"High watermark ${replica.highWatermark.messageOffset} will be used for truncation.")
- replica.highWatermark.messageOffset
- }
-
/**
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.