You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/03/28 16:59:50 UTC
[2/3] kafka git commit: KAFKA-4586;
Add purgeDataBefore() API (KIP-107)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index c2d34d9..ddb2411 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,7 +28,7 @@ import kafka.controller.KafkaController
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{PolicyViolationException, NotEnoughReplicasException, NotLeaderForPartitionException}
import org.apache.kafka.common.protocol.Errors
import scala.collection.JavaConverters._
@@ -235,10 +235,20 @@ class Partition(val topic: String,
def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
getReplica(replicaId) match {
case Some(replica) =>
+ // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
+ val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
+ val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+ // check if the LW of the partition has incremented
+ // since the replica's logStartOffset may have incremented
+ val leaderLWIncremented = newLeaderLW > oldLeaderLW
// check if we need to expand ISR to include this replica
// if it is not in the ISR yet
- maybeExpandIsr(replicaId, logReadResult)
+ val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)
+
+ // some delayed operations may be unblocked after HW or LW changed
+ if (leaderLWIncremented || leaderHWIncremented)
+ tryCompleteDelayedRequests()
debug("Recorded replica %d log end offset (LEO) position %d for partition %s."
.format(replicaId, logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
@@ -263,8 +273,8 @@ class Partition(val topic: String,
*
* This function can be triggered when a replica's LEO has incremented
*/
- def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
- val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
+ def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
+ inWriteLock(leaderIsrUpdateLock) {
// check if this replica needs to be added to the ISR
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
@@ -280,18 +290,12 @@ class Partition(val topic: String,
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
}
-
// check if the HW of the partition can now be incremented
// since the replica may already be in the ISR and its LEO has just incremented
maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
-
case None => false // nothing to do if no longer leader
}
}
-
- // some delayed operations may be unblocked after HW changed
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
}
/*
@@ -376,12 +380,25 @@ class Partition(val topic: String,
}
/**
+ * The low watermark offset value, calculated only if the local replica is the partition leader
+ * It is only used by leader broker to decide when DeleteRecordsRequest is satisfied. Its value is minimum logStartOffset of all live replicas
+ * Low watermark will increase when the leader broker receives either FetchRequest or DeleteRecordsRequest.
+ */
+ def lowWatermarkIfLeader: Long = {
+ if (!isLeaderReplicaLocal)
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
+ assignedReplicas.filter(replica =>
+ replicaManager.metadataCache.isBrokerAlive(replica.brokerId)).map(_.logStartOffset).reduceOption(_ min _).getOrElse(0L)
+ }
+
+ /**
* Try to complete any pending requests. This should be called without holding the leaderIsrUpdateLock.
*/
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(topicPartition)
replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey)
+ replicaManager.tryCompleteDelayedDeleteRecords(requestKey)
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
@@ -467,6 +484,27 @@ class Partition(val topic: String,
info
}
+ /**
+ * Update logStartOffset and low watermark if 1) offset <= highWatermark and 2) it is the leader replica.
+ * This function can trigger log segment deletion and log rolling.
+ *
+ * Return low watermark of the partition.
+ */
+ def deleteRecordsOnLeader(offset: Long): Long = {
+ inReadLock(leaderIsrUpdateLock) {
+ leaderReplicaIfLocal match {
+ case Some(leaderReplica) =>
+ leaderReplica.maybeIncrementLogStartOffset(offset)
+ if (!leaderReplica.log.get.config.delete)
+ throw new PolicyViolationException("Records of partition %s can not be deleted due to the configured policy".format(topicPartition))
+ lowWatermarkIfLeader
+ case None =>
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+ .format(topicPartition, localBrokerId))
+ }
+ }
+ }
+
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 8597b06..3995f9e 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -21,7 +21,8 @@ import kafka.log.Log
import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
-import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+
import org.apache.kafka.common.utils.Time
@@ -35,6 +36,9 @@ class Replica(val brokerId: Int,
// the log end offset value, kept in all replicas;
// for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
+ // the log start offset value, kept in all replicas;
+ // for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
+ @volatile private[this] var _logStartOffset = Log.UnknownLogStartOffset
// The log end offset value at the time the leader received the last FetchRequest from this follower
// This is used to determine the lastCaughtUpTimeMs of the follower
@@ -72,6 +76,7 @@ class Replica(val brokerId: Int,
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
+ logStartOffset = logReadResult.followerLogStartOffset
logEndOffset = logReadResult.info.fetchOffsetMetadata
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
lastFetchTimeMs = logReadResult.fetchTimeMs
@@ -98,6 +103,33 @@ class Replica(val brokerId: Int,
else
logEndOffsetMetadata
+ def maybeIncrementLogStartOffset(offset: Long) {
+ if (isLocal) {
+ if (highWatermark.messageOffset < offset)
+ throw new OffsetOutOfRangeException(s"The specified offset $offset is higher than the high watermark" +
+ s" ${highWatermark.messageOffset} of the partition $topicPartition")
+ log.get.maybeIncrementLogStartOffset(offset)
+ } else {
+ throw new KafkaException(s"Should not try to delete records on partition $topicPartition's non-local replica $brokerId")
+ }
+ }
+
+ private def logStartOffset_=(newLogStartOffset: Long) {
+ if (isLocal) {
+ throw new KafkaException(s"Should not set log start offset on partition $topicPartition's local replica $brokerId " +
+ s"without attempting to delete records of the log")
+ } else {
+ _logStartOffset = newLogStartOffset
+ trace(s"Setting log start offset for remote replica $brokerId for partition $topicPartition to [$newLogStartOffset]")
+ }
+ }
+
+ def logStartOffset =
+ if (isLocal)
+ log.get.logStartOffset
+ else
+ _logStartOffset
+
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
highWatermarkMetadata = newHighWatermark
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 75b1f24..c4b7ce6 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -101,8 +101,7 @@ class ConsumerFetcherThread(name: String,
protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
if (partitionFetchState.isActive)
- fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.offset,
- fetchSize)
+ fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
}
new FetchRequest(fetchRequestBuilder.build())
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d2cac23..96535b1 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -79,6 +79,17 @@ case class LogAppendInfo(var firstOffset: Long,
*
* @param dir The directory in which log segments are created.
* @param config The log configuration settings
+ * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
+ * The logStartOffset can be updated by :
+ * - user's DeleteRecordsRequest
+ * - broker's log retention
+ * - broker's log truncation
+ * The logStartOffset is used to decide the following:
+ * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
+ * It may trigger log rolling if the active segment is deleted.
+ * - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
+ * we make sure that logStartOffset <= log's highWatermark
+ * Other activities such as log cleaning are not affected by logStartOffset.
* @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
* @param scheduler The thread pool scheduler used for background actions
* @param time The time instance used for checking the clock
@@ -87,6 +98,7 @@ case class LogAppendInfo(var firstOffset: Long,
@threadsafe
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
+ @volatile var logStartOffset: Long = 0L,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
@@ -118,8 +130,10 @@ class Log(@volatile var dir: File,
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
activeSegment.size.toInt)
- info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
- .format(name, segments.size(), logEndOffset, time.milliseconds - startMs))
+ logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+ info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
+ .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
}
val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
@@ -443,6 +457,20 @@ class Log(@volatile var dir: File,
}
}
+ /*
+ * Increment the log start offset if the provided offset is larger.
+ */
+ def maybeIncrementLogStartOffset(offset: Long) {
+ // We don't have to write the log start offset to log-start-offset-checkpoint immediately.
+ // The deleteRecordsOffset may be lost only if all in-sync replicas of this broker are shutdown
+ // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
+ lock synchronized {
+ if (offset > logStartOffset) {
+ logStartOffset = offset
+ }
+ }
+ }
+
/**
* Validate the following:
* <ol>
@@ -543,7 +571,7 @@ class Log(@volatile var dir: File,
* @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
*
- * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the base offset of the first segment.
+ * @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
@@ -558,9 +586,9 @@ class Log(@volatile var dir: File,
var entry = segments.floorEntry(startOffset)
- // attempt to read beyond the log end offset is an error
- if(startOffset > next || entry == null)
- throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
+ // return error on attempt to read beyond the log end offset or read below log start offset
+ if(startOffset > next || entry == null || startOffset < logStartOffset)
+ throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
@@ -626,7 +654,7 @@ class Log(@volatile var dir: File,
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
- return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, segmentsCopy.head.baseOffset))
+ return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
@@ -640,7 +668,7 @@ class Log(@volatile var dir: File,
None
}
- targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
+ targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
}
/**
@@ -666,16 +694,21 @@ class Log(@volatile var dir: File,
private def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
- val numToDelete = deletable.size
- if (numToDelete > 0) {
- // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
- if (segments.size == numToDelete)
- roll()
- // remove the segments for lookups
- deletable.foreach(deleteSegment)
- }
- numToDelete
+ deleteSegments(deletable)
+ }
+ }
+
+ private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
+ val numToDelete = deletable.size
+ if (numToDelete > 0) {
+ // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
+ if (segments.size == numToDelete)
+ roll()
+ // remove the segments for lookups
+ deletable.foreach(deleteSegment)
+ logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
}
+ numToDelete
}
/**
@@ -696,10 +729,10 @@ class Log(@volatile var dir: File,
*/
def deleteOldSegments(): Int = {
if (!config.delete) return 0
- deleteRetenionMsBreachedSegments() + deleteRetentionSizeBreachedSegments()
+ deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
}
- private def deleteRetenionMsBreachedSegments() : Int = {
+ private def deleteRetentionMsBreachedSegments() : Int = {
if (config.retentionMs < 0) return 0
val startMs = time.milliseconds
deleteOldSegments(startMs - _.largestTimestamp > config.retentionMs)
@@ -719,16 +752,27 @@ class Log(@volatile var dir: File,
deleteOldSegments(shouldDelete)
}
+ private def deleteLogStartOffsetBreachedSegments() : Int = {
+ // keep active segment to avoid frequent log rolling due to user's DeleteRecordsRequest
+ lock synchronized {
+ val deletable = {
+ if (segments.size() < 2)
+ Seq.empty
+ else
+ logSegments.sliding(2).takeWhile { iterable =>
+ val nextSegment = iterable.toSeq(1)
+ nextSegment.baseOffset <= logStartOffset
+ }.map(_.toSeq(0)).toSeq
+ }
+ deleteSegments(deletable)
+ }
+ }
+
/**
* The size of the log in bytes
*/
def size: Long = logSegments.map(_.size).sum
- /**
- * The earliest message offset in the log
- */
- def logStartOffset: Long = logSegments.head.baseOffset
-
/**
* The offset metadata of the next message that will be appended to the log
*/
@@ -789,7 +833,7 @@ class Log(@volatile var dir: File,
def roll(expectedNextOffset: Long = 0): LogSegment = {
val start = time.nanoseconds
lock synchronized {
- val newOffset = Math.max(expectedNextOffset, logEndOffset)
+ val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
val timeIndexFile = timeIndexFilename(dir, newOffset)
@@ -895,6 +939,7 @@ class Log(@volatile var dir: File,
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
+ this.logStartOffset = math.min(targetOffset, this.logStartOffset)
}
}
}
@@ -920,6 +965,7 @@ class Log(@volatile var dir: File,
preallocate = config.preallocate))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
+ this.logStartOffset = newOffset
}
}
@@ -1082,6 +1128,8 @@ object Log {
/** a directory that is scheduled to be deleted */
val DeleteDirSuffix = "-delete"
+ val UnknownLogStartOffset = -1L
+
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 761edf9..a555420 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -48,12 +48,14 @@ class LogManager(val logDirs: Array[File],
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
- val flushCheckpointMs: Long,
+ val flushRecoveryOffsetCheckpointMs: Long,
+ val flushStartOffsetCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
+ val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
@@ -64,6 +66,7 @@ class LogManager(val logDirs: Array[File],
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
+ private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
@@ -139,10 +142,18 @@ class LogManager(val logDirs: Array[File],
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
- warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e)
+ warn("Error occurred while reading recovery-point-offset-checkpoint file of directory " + dir, e)
warn("Resetting the recovery checkpoint to 0")
}
+ var logStartOffsets = Map[TopicPartition, Long]()
+ try {
+ logStartOffsets = this.logStartOffsetCheckpoints(dir).read
+ } catch {
+ case e: Exception =>
+ warn("Error occurred while reading log-start-offset-checkpoint file of directory " + dir, e)
+ }
+
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
@@ -153,8 +164,9 @@ class LogManager(val logDirs: Array[File],
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+ val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
- val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+ val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
this.logsToBeDeleted.add(current)
} else {
@@ -210,7 +222,12 @@ class LogManager(val logDirs: Array[File],
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
- period = flushCheckpointMs,
+ period = flushRecoveryOffsetCheckpointMs,
+ TimeUnit.MILLISECONDS)
+ scheduler.schedule("kafka-log-start-offset-checkpoint",
+ checkpointLogStartOffsets,
+ delay = InitialTaskDelayMs,
+ period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs",
deleteLogs,
@@ -263,7 +280,10 @@ class LogManager(val logDirs: Array[File],
// update the last flush point
debug("Updating recovery points at " + dir)
- checkpointLogsInDir(dir)
+ checkpointLogRecoveryOffsetsInDir(dir)
+
+ debug("Updating log start offsets at " + dir)
+ checkpointLogStartOffsetsInDir(dir)
// mark that the shutdown was clean by creating marker file
debug("Writing clean shutdown marker at " + dir)
@@ -333,13 +353,21 @@ class LogManager(val logDirs: Array[File],
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
- this.logDirs.foreach(checkpointLogsInDir)
+ this.logDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+ }
+
+ /**
+ * Write out the current log start offset for all logs to a text file in the log directory
+ * to avoid exposing data that have been deleted by DeleteRecordsRequest
+ */
+ def checkpointLogStartOffsets() {
+ this.logDirs.foreach(checkpointLogStartOffsetsInDir)
}
/**
* Make a checkpoint for all logs in provided directory.
*/
- private def checkpointLogsInDir(dir: File): Unit = {
+ private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
@@ -347,6 +375,17 @@ class LogManager(val logDirs: Array[File],
}
/**
+ * Checkpoint log start offset for all logs in provided directory.
+ */
+ private def checkpointLogStartOffsetsInDir(dir: File): Unit = {
+ val logs = this.logsByDir.get(dir.toString)
+ if (logs.isDefined) {
+ this.logStartOffsetCheckpoints(dir).write(
+ logs.get.filter{case (tp, log) => log.logStartOffset > log.logSegments.head.baseOffset}.mapValues(_.logStartOffset))
+ }
+ }
+
+ /**
* Get the log if it exists, otherwise return None
*/
def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
@@ -362,7 +401,7 @@ class LogManager(val logDirs: Array[File],
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
- val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
+ val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic,
@@ -425,6 +464,7 @@ class LogManager(val logDirs: Array[File],
val renamedDir = new File(removedLog.dir.getParent, dirName)
val renameSuccessful = removedLog.dir.renameTo(renamedDir)
if (renameSuccessful) {
+ checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
removedLog.dir = renamedDir
// change the file pointers for log and index file
for (logSegment <- removedLog.logSegments) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9263515..4e77625 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -387,32 +387,33 @@ class LogSegment(val log: FileRecords,
}
/**
- * Search the message offset based on timestamp.
- * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
- * greater than or equals to the target timestamp.
+ * Search the message offset based on timestamp and offset.
*
- * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
- * timestamp will be max timestamp in the segment.
+ * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
*
- * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
- * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+ * - If all the messages in the segment have smaller offsets, return None
+ * - If all the messages in the segment have smaller timestamps, return None
+ * - If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp
+ * the returned the offset will be max(the base offset of the segment, startingOffset) and the timestamp will be Message.NoTimestamp.
+ * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+ * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
*
- * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
- * from the indexed position. This could happen if the log is truncated after we get the indexed position but
- * before we scan the log from there. In this case we simply return None and the caller will need to check on
- * the truncated log and maybe retry or even do the search on another log segment.
+ * This methods only returns None when 1) all messages' offset < startOffing or 2) the log is not empty but we did not
+ * see any message when scanning the log from the indexed position. The latter could happen if the log is truncated
+ * after we get the indexed position but before we scan the log from there. In this case we simply return None and the
+ * caller will need to check on the truncated log and maybe retry or even do the search on another log segment.
*
* @param timestamp The timestamp to search for.
- * @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
- * target timestamp. None will be returned if there is no such message.
+ * @param startingOffset The starting offset to search.
+ * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there is no such message.
*/
- def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
+ def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = {
// Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp)
- val position = index.lookup(timestampOffset.offset).position
+ val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position
// Search the timestamp
- Option(log.searchForTimestamp(timestamp, position)).map { timestampAndOffset =>
+ Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { timestampAndOffset =>
TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 8842724..14e56bd 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -63,13 +63,13 @@ abstract class AbstractFetcherThread(name: String,
/* callbacks to be defined in subclass */
// process fetched data
- def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
+ protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PD)
// handle a partition whose offset is out of range and return a new fetch offset
- def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
+ protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
// deal with partitions with errors, potentially due to leadership changes
- def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
+ protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
@@ -140,17 +140,17 @@ abstract class AbstractFetcherThread(name: String,
val partitionId = topicPartition.partition
Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
// we append to the log if the current offset is defined and it is the same as the offset requested during fetch
- if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
+ if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset) {
partitionData.error match {
case Errors.NONE =>
try {
val records = partitionData.toRecords
val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
- currentPartitionFetchState.offset)
+ currentPartitionFetchState.fetchOffset)
fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
- processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)
+ processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
val validBytes = records.validBytes
if (validBytes > 0) {
@@ -164,18 +164,18 @@ abstract class AbstractFetcherThread(name: String,
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
- logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
+ logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.fetchOffset + " error " + ime.getMessage)
updatePartitionsWithError(topicPartition);
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
- .format(topic, partitionId, currentPartitionFetchState.offset), e)
+ .format(topic, partitionId, currentPartitionFetchState.fetchOffset), e)
}
case Errors.OFFSET_OUT_OF_RANGE =>
try {
val newOffset = handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
- .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+ .format(currentPartitionFetchState.fetchOffset, topic, partitionId, newOffset))
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
@@ -226,7 +226,7 @@ abstract class AbstractFetcherThread(name: String,
for (partition <- partitions) {
Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
if (currentPartitionFetchState.isActive)
- partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
+ partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
)
}
partitionMapCond.signalAll()
@@ -350,11 +350,11 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
/**
* case class to keep partition offset and its state(active, inactive)
*/
-case class PartitionFetchState(offset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
- def this(offset: Long) = this(offset, new DelayedItem(0))
+ def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
- override def toString = "%d-%b".format(offset, isActive)
+ override def toString = "%d-%b".format(fetchOffset, isActive)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
new file mode 100644
index 0000000..e5b301c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.DeleteRecordsResponse
+
+import scala.collection._
+
+
+case class DeleteRecordsPartitionStatus(requiredOffset: Long,
+ responseStatus: DeleteRecordsResponse.PartitionResponse) {
+ @volatile var acksPending = false
+
+ override def toString = "[acksPending: %b, error: %s, lowWatermark: %d, requiredOffset: %d]"
+ .format(acksPending, responseStatus.error.toString, responseStatus.lowWatermark, requiredOffset)
+}
+
+/**
+ * A delayed delete records operation that can be created by the replica manager and watched
+ * in the delete records operation purgatory
+ */
+class DelayedDeleteRecords(delayMs: Long,
+ deleteRecordsStatus: Map[TopicPartition, DeleteRecordsPartitionStatus],
+ replicaManager: ReplicaManager,
+ responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit)
+ extends DelayedOperation(delayMs) {
+
+ // first update the acks pending variable according to the error code
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ if (status.responseStatus.error == Errors.NONE) {
+ // Timeout error state will be cleared when required acks are received
+ status.acksPending = true
+ status.responseStatus.error = Errors.REQUEST_TIMED_OUT
+ } else {
+ status.acksPending = false
+ }
+
+ trace("Initial partition status for %s is %s".format(topicPartition, status))
+ }
+
+ /**
+ * The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
+ *
+ * 1) There was an error while checking if all replicas have caught up to to the deleteRecordsOffset: set an error in response
+ * 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
+ *
+ */
+ override def tryComplete(): Boolean = {
+ // check for each partition if it still has pending acks
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ trace(s"Checking delete records satisfaction for ${topicPartition}, current status $status")
+ // skip those partitions that have already been satisfied
+ if (status.acksPending) {
+ val (lowWatermarkReached, error, lw) = replicaManager.getPartition(topicPartition) match {
+ case Some(partition) =>
+ partition.leaderReplicaIfLocal match {
+ case Some(_) =>
+ val leaderLW = partition.lowWatermarkIfLeader
+ (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
+ case None =>
+ (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+ }
+ case None =>
+ (false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+ }
+ if (error != Errors.NONE || lowWatermarkReached) {
+ status.acksPending = false
+ status.responseStatus.error = error
+ status.responseStatus.lowWatermark = lw
+ }
+ }
+ }
+
+ // check if every partition has satisfied at least one of case A or B
+ if (!deleteRecordsStatus.values.exists(_.acksPending))
+ forceComplete()
+ else
+ false
+ }
+
+ override def onExpiration() {
+ deleteRecordsStatus.foreach { case (topicPartition, status) =>
+ if (status.acksPending) {
+ DelayedDeleteRecordsMetrics.recordExpiration(topicPartition)
+ }
+ }
+ }
+
+ /**
+ * Upon completion, return the current response status along with the error code per partition
+ */
+ override def onComplete() {
+ val responseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+ responseCallback(responseStatus)
+ }
+}
+
+object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+
+ private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+ def recordExpiration(partition: TopicPartition) {
+ aggregateExpirationMeter.mark()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index a05131a..cbee78a 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -150,7 +150,7 @@ class DelayedFetch(delayMs: Long,
)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
- tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+ tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
}
responseCallback(fetchPartitionData)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0798efd..defbf34 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -99,6 +99,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
+ case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -147,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
+ val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, onLeadershipChange)
new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
@@ -199,7 +200,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val updateMetadataResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
+ val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
if (deletedPartitions.nonEmpty)
coordinator.handleDeletedPartitions(deletedPartitions)
@@ -451,12 +452,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
+ FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY))
}
// the callback for sending a fetch response
@@ -474,17 +475,17 @@ class KafkaApis(val requestChannel: RequestChannel,
val convertedData = replicaManager.getMagic(tp) match {
case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
trace(s"Down converting message to V0 for fetch request from $clientId")
- FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+ FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
trace(s"Down converting message to V1 for fetch request from $clientId")
- FetchPartitionData(data.error, data.hw, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+ FetchPartitionData(data.error, data.hw, data.logStartOffset, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
case _ => data
}
- tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LSO,
- null, convertedData.records)
+ tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ convertedData.logStartOffset, null, convertedData.records)
}
}
@@ -728,7 +729,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new Array[(Long, Long)](segments.length)
for (i <- segments.indices)
- offsetTimeArray(i) = (segments(i).baseOffset, segments(i).lastModified)
+ offsetTimeArray(i) = (math.max(segments(i).baseOffset, log.logStartOffset), segments(i).lastModified)
if (lastSegmentHasSize)
offsetTimeArray(segments.length) = (log.logEndOffset, time.milliseconds)
@@ -1259,6 +1260,54 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
+ def handleDeleteRecordsRequest(request: RequestChannel.Request) {
+ val deleteRecordsRequest = request.body[DeleteRecordsRequest]
+
+ val (authorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteRecordsRequest.partitionOffsets.asScala.partition {
+ case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
+ }
+
+ val (authorizedForDeleteTopics, unauthorizedForDeleteTopics) = authorizedForDescribeTopics.partition {
+ case (topicPartition, _) => authorize(request.session, Delete, new Resource(auth.Topic, topicPartition.topic))
+ }
+
+ // the callback for sending a DeleteRecordsResponse
+ def sendResponseCallback(responseStatus: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse]) {
+
+ val mergedResponseStatus = responseStatus ++
+ unauthorizedForDeleteTopics.mapValues(_ =>
+ new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED)) ++
+ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ =>
+ new DeleteRecordsResponse.PartitionResponse(DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION))
+
+ mergedResponseStatus.foreach { case (topicPartition, status) =>
+ if (status.error != Errors.NONE) {
+ debug("DeleteRecordsRequest with correlation id %d from client %s on partition %s failed due to %s".format(
+ request.header.correlationId,
+ request.header.clientId,
+ topicPartition,
+ status.error.exceptionName))
+ }
+ }
+
+ val respBody = new DeleteRecordsResponse(mergedResponseStatus.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
+
+ // When this callback is triggered, the remote API call has completed
+ request.apiRemoteCompleteTimeMs = time.milliseconds
+ }
+
+ if (authorizedForDeleteTopics.isEmpty)
+ sendResponseCallback(Map.empty)
+ else {
+ // call the replica manager to append messages to the replicas
+ replicaManager.deleteRecords(
+ deleteRecordsRequest.timeout.toLong,
+ authorizedForDeleteTopics.mapValues(_.toLong),
+ sendResponseCallback)
+ }
+ }
+
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 879bc51..fe6631e 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -100,6 +100,7 @@ object Defaults {
val LogDeleteDelayMs = 60000
val LogFlushSchedulerIntervalMs = Long.MaxValue
val LogFlushOffsetCheckpointIntervalMs = 60000
+ val LogFlushStartOffsetCheckpointIntervalMs = 60000
val LogPreAllocateEnable = false
// lazy val as `InterBrokerProtocolVersion` is defined later
lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
@@ -125,6 +126,7 @@ object Defaults {
val ReplicaHighWatermarkCheckpointIntervalMs = 5000L
val FetchPurgatoryPurgeIntervalRequests = 1000
val ProducerPurgatoryPurgeIntervalRequests = 1000
+ val DeleteRecordsPurgatoryPurgeIntervalRequests = 1
val AutoLeaderRebalanceEnable = true
val LeaderImbalancePerBrokerPercentage = 10
val LeaderImbalanceCheckIntervalSeconds = 300
@@ -273,6 +275,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+ val LogFlushStartOffsetCheckpointIntervalMsProp = "log.flush.start.offset.checkpoint.interval.ms"
val LogPreAllocateProp = "log.preallocate"
val LogMessageFormatVersionProp = LogConfigPrefix + "message.format.version"
val LogMessageTimestampTypeProp = LogConfigPrefix + "message.timestamp.type"
@@ -296,6 +299,7 @@ object KafkaConfig {
val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
+ val DeleteRecordsPurgatoryPurgeIntervalRequestsProp = "delete.records.purgatory.purge.interval.requests"
val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
@@ -468,6 +472,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in " + LogFlushSchedulerIntervalMsProp + " is used"
val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+ val LogFlushStartOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of log start offset"
val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
val LogMessageFormatVersionDoc = "Specify the message format version the broker will use to append messages to the logs. The value should be a valid ApiVersion. " +
"Some examples are: 0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format version, the " +
@@ -521,6 +526,7 @@ object KafkaConfig {
val ReplicaHighWatermarkCheckpointIntervalMsDoc = "The frequency with which the high watermark is saved out to disk"
val FetchPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the fetch request purgatory"
val ProducerPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the producer request purgatory"
+ val DeleteRecordsPurgatoryPurgeIntervalRequestsDoc = "The purge interval (in number of requests) of the delete records request purgatory"
val AutoLeaderRebalanceEnableDoc = "Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals"
val LeaderImbalancePerBrokerPercentageDoc = "The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage."
val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller"
@@ -686,6 +692,7 @@ object KafkaConfig {
.define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
.define(LogFlushIntervalMsProp, LONG, null, HIGH, LogFlushIntervalMsDoc)
.define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+ .define(LogFlushStartOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushStartOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushStartOffsetCheckpointIntervalMsDoc)
.define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
@@ -710,6 +717,7 @@ object KafkaConfig {
.define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc)
.define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc)
.define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc)
+ .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DeleteRecordsPurgatoryPurgeIntervalRequests, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc)
.define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc)
.define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc)
.define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
@@ -862,6 +870,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val numRecoveryThreadsPerDataDir = getInt(KafkaConfig.NumRecoveryThreadsPerDataDirProp)
val logFlushSchedulerIntervalMs = getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)
val logFlushOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushOffsetCheckpointIntervalMsProp).toLong
+ val logFlushStartOffsetCheckpointIntervalMs = getInt(KafkaConfig.LogFlushStartOffsetCheckpointIntervalMsProp).toLong
val logCleanupIntervalMs = getLong(KafkaConfig.LogCleanupIntervalMsProp)
val logCleanupPolicy = getList(KafkaConfig.LogCleanupPolicyProp)
val offsetsRetentionMinutes = getInt(KafkaConfig.OffsetsRetentionMinutesProp)
@@ -907,6 +916,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val replicaHighWatermarkCheckpointIntervalMs = getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
val fetchPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
val producerPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
+ val deleteRecordsPurgatoryPurgeIntervalRequests = getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
val autoLeaderRebalanceEnable = getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 465b0b7..0d3e49c 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -297,7 +297,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
}
protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
- new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
+ new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower, metadataCache)
private def initZk(): ZkUtils = {
info(s"Connecting to zookeeper on ${config.zkConnect}")
@@ -655,7 +655,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
- flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+ flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+ flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 9a6090d..bf36974 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -132,6 +132,12 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
}
}
+ def isBrokerAlive(brokerId: Int): Boolean = {
+ inReadLock(partitionMetadataLock) {
+ aliveBrokers.contains(brokerId)
+ }
+ }
+
def getAliveBrokers: Seq[Broker] = {
inReadLock(partitionMetadataLock) {
aliveBrokers.values.toBuffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 29a2467..5f055a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -60,7 +60,8 @@ class ReplicaFetcherThread(name: String,
type PD = PartitionData
private val fetchRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
+ else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
@@ -136,10 +137,12 @@ class ReplicaFetcherThread(name: String,
trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition))
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
+ val leaderLogStartOffset = partitionData.logStartOffset
// for the follower replica, we do not need to keep
// its segment base offset the physical position,
// these values will be computed upon making the leader
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+ replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
if (logger.isTraceEnabled)
trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark")
if (quota.isThrottled(topicPartition))
@@ -289,8 +292,10 @@ class ReplicaFetcherThread(name: String,
partitionMap.foreach { case (topicPartition, partitionFetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
- if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition))
- requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize))
+ if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) {
+ val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
+ requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize))
+ }
}
val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, requestMap)
@@ -313,7 +318,7 @@ object ReplicaFetcherThread {
private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest {
def isEmpty: Boolean = underlying.fetchData().isEmpty
def offset(topicPartition: TopicPartition): Long =
- underlying.fetchData().asScala(topicPartition).offset
+ underlying.fetchData().asScala(topicPartition).fetchOffset
}
private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
@@ -326,6 +331,8 @@ object ReplicaFetcherThread {
def highWatermark: Long = underlying.highWatermark
+ def logStartOffset: Long = underlying.logStartOffset
+
def exception: Option[Throwable] = error match {
case Errors.NONE => None
case e => Some(e.exception)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8b05ad40/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5ba093e..8f67425 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -29,12 +29,13 @@ import kafka.log.{Log, LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.utils._
-import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, PolicyViolationException}
+import org.apache.kafka.common.errors.{NotLeaderForPartitionException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, ReplicaNotAvailableException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, DeleteRecordsRequest, DeleteRecordsResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -52,6 +53,13 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
}
}
+case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exception: Option[Throwable] = None) {
+ def error: Errors = exception match {
+ case None => Errors.NONE
+ case Some(e) => Errors.forException(e)
+ }
+}
+
/*
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
@@ -63,7 +71,9 @@ case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = N
*/
case class LogReadResult(info: FetchDataInfo,
hw: Long,
+ leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
+ followerLogStartOffset: Long,
fetchTimeMs: Long,
readSize: Int,
exception: Option[Throwable] = None) {
@@ -74,16 +84,19 @@ case class LogReadResult(info: FetchDataInfo,
}
override def toString =
- s"Fetch Data: [$info], HW: [$hw], leaderLogEndOffset: [$leaderLogEndOffset], readSize: [$readSize], error: [$error]"
+ s"Fetch Data: [$info], HW: [$hw], leaderLogStartOffset: [$leaderLogStartOffset], leaderLogEndOffset: [$leaderLogEndOffset], " +
+ s"followerLogStartOffset: [$followerLogStartOffset], fetchTimeMs: [$fetchTimeMs], readSize: [$readSize], error: [$error]"
}
-case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, records: Records)
+case class FetchPartitionData(error: Errors = Errors.NONE, hw: Long = -1L, logStartOffset: Long, records: Records)
object LogReadResult {
val UnknownLogReadResult = LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = -1)
}
@@ -109,6 +122,7 @@ class ReplicaManager(val config: KafkaConfig,
val logManager: LogManager,
val isShuttingDown: AtomicBoolean,
quotaManager: ReplicationQuotaManager,
+ val metadataCache: MetadataCache,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -130,6 +144,8 @@ class ReplicaManager(val config: KafkaConfig,
purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)
+ val delayedDeleteRecordsPurgatory = DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", localBrokerId, config.deleteRecordsPurgatoryPurgeIntervalRequests)
val leaderCount = newGauge(
"LeaderCount",
@@ -212,6 +228,15 @@ class ReplicaManager(val config: KafkaConfig,
debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed))
}
+ /**
+ * Try to complete some delayed DeleteRecordsRequest with the request key;
+ * this needs to be triggered when the partition low watermark has changed
+ */
+ def tryCompleteDelayedDeleteRecords(key: DelayedOperationKey) {
+ val completed = delayedDeleteRecordsPurgatory.checkAndComplete(key)
+ debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed))
+ }
+
def startup() {
// start ISR expiration thread
// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR
@@ -316,7 +341,7 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}
- if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
+ if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
@@ -345,14 +370,108 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /**
+ * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas;
+ * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset
+ */
+ private def deleteRecordsOnLocalLog(offsetPerPartition: Map[TopicPartition, Long]): Map[TopicPartition, LogDeleteRecordsResult] = {
+ trace("Delete records on local logs to offsets [%s]".format(offsetPerPartition))
+ offsetPerPartition.map { case (topicPartition, requestedOffset) =>
+ // reject delete records operation on internal topics
+ if (Topic.isInternal(topicPartition.topic)) {
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
+ } else {
+ try {
+ val partition = getPartition(topicPartition).getOrElse(
+ throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId)))
+ val convertedOffset =
+ if (requestedOffset == DeleteRecordsRequest.HIGH_WATERMARK) {
+ partition.leaderReplicaIfLocal match {
+ case Some(leaderReplica) =>
+ leaderReplica.highWatermark.messageOffset
+ case None =>
+ throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+ .format(topicPartition, localBrokerId))
+ }
+ } else
+ requestedOffset
+ if (convertedOffset < 0)
+ throw new OffsetOutOfRangeException(s"The offset $convertedOffset for partition $topicPartition is not valid")
+
+ val lowWatermark = partition.deleteRecordsOnLeader(convertedOffset)
+ (topicPartition, LogDeleteRecordsResult(convertedOffset, lowWatermark))
+ } catch {
+ // NOTE: Failed produce requests metric is not incremented for known exceptions
+ // it is supposed to indicate un-expected failures of a broker in handling a produce request
+ case e: KafkaStorageException =>
+ fatal("Halting due to unrecoverable I/O error while handling DeleteRecordsRequest: ", e)
+ Runtime.getRuntime.halt(1)
+ (topicPartition, null)
+ case e@ (_: UnknownTopicOrPartitionException |
+ _: NotLeaderForPartitionException |
+ _: OffsetOutOfRangeException |
+ _: PolicyViolationException |
+ _: NotEnoughReplicasException) =>
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(e)))
+ case t: Throwable =>
+ error("Error processing delete records operation on partition %s".format(topicPartition), t)
+ (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(t)))
+ }
+ }
+ }
+ }
+
+ // If there exists a topic partition that meets the following requirement,
+ // we need to put a delayed DeleteRecordsRequest and wait for the delete records operation to complete
+ //
+ // 1. the delete records operation on this partition is successful
+ // 2. low watermark of this partition is smaller than the specified offset
+ private def delayedDeleteRecordsRequired(localDeleteRecordsResults: Map[TopicPartition, LogDeleteRecordsResult]): Boolean = {
+ localDeleteRecordsResults.exists{ case (tp, deleteRecordsResult) =>
+ deleteRecordsResult.exception.isEmpty && deleteRecordsResult.lowWatermark < deleteRecordsResult.requestedOffset
+ }
+ }
+
+ def deleteRecords(timeout: Long,
+ offsetPerPartition: Map[TopicPartition, Long],
+ responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit) {
+ val timeBeforeLocalDeleteRecords = time.milliseconds
+ val localDeleteRecordsResults = deleteRecordsOnLocalLog(offsetPerPartition)
+ debug("Delete records on local log in %d ms".format(time.milliseconds - timeBeforeLocalDeleteRecords))
+
+ val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
+ topicPartition ->
+ DeleteRecordsPartitionStatus(
+ result.requestedOffset, // requested offset
+ new DeleteRecordsResponse.PartitionResponse(result.lowWatermark, result.error)) // response status
+ }
+
+ if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
+ // create delayed delete records operation
+ val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
+
+ // create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
+ val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
+
+ // try to complete the request immediately, otherwise put it into the purgatory
+ // this is because while the delayed delete records operation is being created, new
+ // requests may arrive and hence make this operation completable.
+ delayedDeleteRecordsPurgatory.tryCompleteElseWatch(delayedDeleteRecords, deleteRecordsRequestKeys)
+ } else {
+ // we can respond immediately
+ val deleteRecordsResponseStatus = deleteRecordsStatus.mapValues(status => status.responseStatus)
+ responseCallback(deleteRecordsResponseStatus)
+ }
+ }
+
// If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
//
// 1. required acks = -1
// 2. there is data to append
// 3. at least one partition append was successful (fewer errors than partitions)
- private def delayedRequestRequired(requiredAcks: Short,
- entriesPerPartition: Map[TopicPartition, MemoryRecords],
- localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
+ private def delayedProduceRequestRequired(requiredAcks: Short,
+ entriesPerPartition: Map[TopicPartition, MemoryRecords],
+ localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
requiredAcks == -1 &&
entriesPerPartition.nonEmpty &&
localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
@@ -471,7 +590,7 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
- tp -> FetchPartitionData(result.error, result.hw, result.info.records)
+ tp -> FetchPartitionData(result.error, result.hw, result.leaderLogStartOffset, result.info.records)
}
responseCallback(fetchPartitionData)
} else {
@@ -508,8 +627,9 @@ class ReplicaManager(val config: KafkaConfig,
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
- val offset = fetchInfo.offset
+ val offset = fetchInfo.fetchOffset
val partitionFetchSize = fetchInfo.maxBytes
+ val followerLogStartOffset = fetchInfo.logStartOffset
BrokerTopicStats.getBrokerTopicStats(tp.topic).totalFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
@@ -539,6 +659,7 @@ class ReplicaManager(val config: KafkaConfig,
*/
val initialLogEndOffset = localReplica.logEndOffset.messageOffset
val initialHighWatermark = localReplica.highWatermark.messageOffset
+ val initialLogStartOffset = localReplica.logStartOffset
val fetchTimeMs = time.milliseconds
val logReadInfo = localReplica.log match {
case Some(log) =>
@@ -563,7 +684,9 @@ class ReplicaManager(val config: KafkaConfig,
LogReadResult(info = logReadInfo,
hw = initialHighWatermark,
+ leaderLogStartOffset = initialLogStartOffset,
leaderLogEndOffset = initialLogEndOffset,
+ followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = partitionFetchSize,
exception = None)
@@ -576,7 +699,9 @@ class ReplicaManager(val config: KafkaConfig,
_: OffsetOutOfRangeException) =>
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
@@ -586,7 +711,9 @@ class ReplicaManager(val config: KafkaConfig,
error(s"Error processing fetch operation on partition $tp, offset $offset", e)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
hw = -1L,
+ leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
+ followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = partitionFetchSize,
exception = Some(e))
@@ -622,7 +749,7 @@ class ReplicaManager(val config: KafkaConfig,
def getMagic(topicPartition: TopicPartition): Option[Byte] =
getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
- def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] = {
+ def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -640,7 +767,6 @@ class ReplicaManager(val config: KafkaConfig,
}
def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
- metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
@@ -695,7 +821,7 @@ class ReplicaManager(val config: KafkaConfig,
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
- makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
+ makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
else
Set.empty[Partition]
@@ -801,8 +927,7 @@ class ReplicaManager(val config: KafkaConfig,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
- responseMap: mutable.Map[TopicPartition, Errors],
- metadataCache: MetadataCache) : Set[Partition] = {
+ responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")