You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:13 UTC
[4/6] kafka git commit: KAFKA-5121;
Implement transaction index for KIP-98
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b7f340f..5722a43 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -30,9 +30,10 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
import scala.collection.{Seq, mutable}
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.{Time, Utils}
@@ -40,10 +41,13 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCod
import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import java.util.Map.{Entry => JEntry}
+import java.lang.{Long => JLong}
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
- NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false)
+ NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
@@ -59,9 +63,6 @@ object LogAppendInfo {
* @param shallowCount The number of shallow messages
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
- * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a
- * RecordBatch and keep track of metadata across Records in a RecordBatch.
- * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log.
*/
case class LogAppendInfo(var firstOffset: Long,
var lastOffset: Long,
@@ -72,9 +73,19 @@ case class LogAppendInfo(var firstOffset: Long,
targetCodec: CompressionCodec,
shallowCount: Int,
validBytes: Int,
- offsetsMonotonic: Boolean,
- producerAppendInfos: Map[Long, ProducerAppendInfo],
- isDuplicate: Boolean = false)
+ offsetsMonotonic: Boolean)
+
+/**
+ * A class used to hold useful metadata about a completed transaction. This is used to build
+ * the transaction index after appending to the log.
+ *
+ * @param producerId The ID of the producer
+ * @param firstOffset The first offset (inclusive) of the transaction
+ * @param lastOffset The last offset (inclusive) of the transaction. This is always the offset of the
+ * COMMIT/ABORT control record which indicates the transaction's completion.
+ * @param isAborted Whether or not the transaction was aborted
+ */
+case class CompletedTxn(producerId: Long, firstOffset: Long, lastOffset: Long, isAborted: Boolean)
/**
* An append-only log for storing messages.
@@ -111,8 +122,7 @@ class Log(@volatile var dir: File,
scheduler: Scheduler,
time: Time = Time.SYSTEM,
val maxPidExpirationMs: Int = 60 * 60 * 1000,
- val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
- val pidSnapshotIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup {
+ val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -133,8 +143,10 @@ class Log(@volatile var dir: File,
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
- /* Construct and load PID map */
- private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs)
+ /* The earliest offset which is part of an incomplete transaction. This is used to compute the LSO. */
+ @volatile var firstUnstableOffset: Option[LogOffsetMetadata] = None
+
+ private val producerStateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@@ -147,7 +159,7 @@ class Log(@volatile var dir: File,
loadSegments()
/* Calculate the offset of the next message */
- nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
+ nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset,
activeSegment.size.toInt)
leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset)
@@ -157,7 +169,7 @@ class Log(@volatile var dir: File,
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.clearEarliest(logStartOffset)
- buildAndRecoverPidMap(logEndOffset)
+ loadProducerState(logEndOffset)
info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
.format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
@@ -189,19 +201,12 @@ class Log(@volatile var dir: File,
},
tags)
- scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => {
+ scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => {
lock synchronized {
- pidMap.removeExpiredPids(time.milliseconds)
+ producerStateManager.removeExpiredProducers(time.milliseconds)
}
}, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
- scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => {
- lock synchronized {
- pidMap.maybeTakeSnapshot()
- }
- }, period = pidSnapshotIntervalMs, unit = TimeUnit.MILLISECONDS)
-
-
/** The name of this log */
def name = dir.getName()
@@ -212,13 +217,10 @@ class Log(@volatile var dir: File,
new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
}
- /* Load the log segments from the log files on disk */
- private def loadSegments() {
+ private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
var swapFiles = Set[File]()
- // first do a pass through the files in the log directory and remove any temporary files
- // and find any interrupted swap operations
- for(file <- dir.listFiles if file.isFile) {
+ for (file <- dir.listFiles if file.isFile) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName
@@ -229,48 +231,51 @@ class Log(@volatile var dir: File,
// we crashed in the middle of a swap operation, to recover:
// if a log, delete the .index file, complete the swap operation later
// if an index just delete it, it will be rebuilt
- val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
- if(baseName.getPath.endsWith(IndexFileSuffix)) {
+ val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
+ if (isIndexFile(baseFile)) {
Files.deleteIfExists(file.toPath)
- } else if(baseName.getPath.endsWith(LogFileSuffix)){
- // delete the index
- val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
- Files.deleteIfExists(index.toPath())
+ } else if (isLogFile(baseFile)) {
+ // delete the index files
+ val offset = offsetFromFilename(baseFile.getName)
+ Files.deleteIfExists(Log.offsetIndexFile(dir, offset).toPath)
+ Files.deleteIfExists(Log.timeIndexFile(dir, offset).toPath)
+ Files.deleteIfExists(Log.transactionIndexFile(dir, offset).toPath)
swapFiles += file
}
}
}
+ swapFiles
+ }
- // now do a second pass and load all the .log and all index files
- for(file <- dir.listFiles if file.isFile) {
+ private def loadSegmentFiles(): Unit = {
+ // load segments in ascending order because transactional data from one segment may depend on the
+ // segments that come before it
+ for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
val filename = file.getName
- if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
+ if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
- val logFile =
- if (filename.endsWith(TimeIndexFileSuffix))
- new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
- else
- new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
-
- if(!logFile.exists) {
+ val offset = offsetFromFilename(filename)
+ val logFile = logFilename(dir, offset)
+ if (!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
Files.deleteIfExists(file.toPath)
}
- } else if(filename.endsWith(LogFileSuffix)) {
- // if its a log file, load the corresponding log segment
+ } else if (isLogFile(file)) {
+ // if it's a log file, load the corresponding log segment
val startOffset = offsetFromFilename(filename)
- val indexFile = Log.indexFilename(dir, startOffset)
- val timeIndexFile = Log.timeIndexFilename(dir, startOffset)
+ val indexFile = Log.offsetIndexFile(dir, startOffset)
+ val timeIndexFile = Log.timeIndexFile(dir, startOffset)
+ val txnIndexFile = Log.transactionIndexFile(dir, startOffset)
val indexFileExists = indexFile.exists()
val timeIndexFileExists = timeIndexFile.exists()
val segment = new LogSegment(dir = dir,
- startOffset = startOffset,
- indexIntervalBytes = config.indexInterval,
- maxIndexSize = config.maxIndexSize,
- rollJitterMs = config.randomSegmentJitter,
- time = time,
- fileAlreadyExists = true)
+ startOffset = startOffset,
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize,
+ rollJitterMs = config.randomSegmentJitter,
+ time = time,
+ fileAlreadyExists = true)
if (indexFileExists) {
try {
@@ -279,25 +284,43 @@ class Log(@volatile var dir: File,
if (!timeIndexFileExists)
segment.timeIndex.resize(0)
segment.timeIndex.sanityCheck()
+ segment.txnIndex.sanityCheck()
} catch {
case e: java.lang.IllegalArgumentException =>
warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
- s"${indexFile.getAbsolutePath} and rebuilding index...")
+ s"${indexFile.getAbsolutePath}, and ${txnIndexFile.getAbsolutePath} and rebuilding index...")
Files.deleteIfExists(timeIndexFile.toPath)
Files.delete(indexFile.toPath)
- segment.recover(config.maxMessageSize)
+ segment.txnIndex.delete()
+ recoverSegment(segment)
}
} else {
- error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
- segment.recover(config.maxMessageSize)
+ error("Could not find offset index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+ recoverSegment(segment)
}
segments.put(startOffset, segment)
}
}
+ }
+
+ private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized {
+ val stateManager = new ProducerStateManager(topicPartition, dir, maxPidExpirationMs)
+ stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds)
+ logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment =>
+ val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
+ val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue)
+ if (fetchDataInfo != null)
+ loadProducersFromLog(stateManager, fetchDataInfo.records)
+ }
+ val bytesTruncated = segment.recover(config.maxMessageSize, stateManager, leaderEpochCache)
- // Finally, complete any interrupted swap operations. To be crash-safe,
- // log files that are replaced by the swap segment should be renamed to .deleted
- // before the swap file is restored as the new segment file.
+ // once we have recovered the segment's data, take a snapshot to ensure that we won't
+ // need to reload the same segment again while recovering another segment.
+ stateManager.takeSnapshot()
+ bytesTruncated
+ }
+
+ private def completeSwapOperations(swapFiles: Set[File]): Unit = {
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
val filename = logFile.getName
@@ -306,18 +329,36 @@ class Log(@volatile var dir: File,
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+ val txnIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TxnIndexFileSuffix) + SwapFileSuffix)
+ val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
val swapSegment = new LogSegment(FileRecords.open(swapFile),
- index = index,
- timeIndex = timeIndex,
- baseOffset = startOffset,
- indexIntervalBytes = config.indexInterval,
- rollJitterMs = config.randomSegmentJitter,
- time = time)
+ index = index,
+ timeIndex = timeIndex,
+ txnIndex = txnIndex,
+ baseOffset = startOffset,
+ indexIntervalBytes = config.indexInterval,
+ rollJitterMs = config.randomSegmentJitter,
+ time = time)
info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath))
- swapSegment.recover(config.maxMessageSize)
- val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
+ recoverSegment(swapSegment)
+ val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset())
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}
+ }
+
+ /* Load the log segments from the log files on disk */
+ private def loadSegments() {
+ // first do a pass through the files in the log directory and remove any temporary files
+ // and find any interrupted swap operations
+ val swapFiles = removeTempFilesAndCollectSwapFiles()
+
+ // now do a second pass and load all the log and index files
+ loadSegmentFiles()
+
+ // Finally, complete any interrupted swap operations. To be crash-safe,
+ // log files that are replaced by the swap segment should be renamed to .deleted
+ // before the swap file is restored as the new segment file.
+ completeSwapOperations(swapFiles)
if(logSegments.isEmpty) {
// no existing segments, create a new mutable segment beginning at offset 0
@@ -330,13 +371,11 @@ class Log(@volatile var dir: File,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
- } else {
- if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
- recoverLog()
- // reset the index size of the currently active log segment to allow more entries
- activeSegment.index.resize(config.maxIndexSize)
- activeSegment.timeIndex.resize(config.maxIndexSize)
- }
+ } else if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
+ recoverLog()
+ // reset the index size of the currently active log segment to allow more entries
+ activeSegment.index.resize(config.maxIndexSize)
+ activeSegment.timeIndex.resize(config.maxIndexSize)
}
}
@@ -347,66 +386,72 @@ class Log(@volatile var dir: File,
private def recoverLog() {
// if we have the clean shutdown marker, skip recovery
if(hasCleanShutdownFile) {
- this.recoveryPoint = activeSegment.nextOffset
+ this.recoveryPoint = activeSegment.nextOffset()
return
}
// okay we need to actually recovery this log
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
- val curr = unflushed.next
- info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
+ val segment = unflushed.next
+ info("Recovering unflushed segment %d in log %s.".format(segment.baseOffset, name))
val truncatedBytes =
try {
- curr.recover(config.maxMessageSize, Some(leaderEpochCache))
+ recoverSegment(segment, Some(leaderEpochCache))
} catch {
case _: InvalidOffsetException =>
- val startOffset = curr.baseOffset
+ val startOffset = segment.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
- curr.truncateTo(startOffset)
+ segment.truncateTo(startOffset)
}
if(truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
- warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
+ warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(segment.baseOffset, name,
+ segment.nextOffset()))
unflushed.foreach(deleteSegment)
}
}
}
- /**
- * Creates an instance of id map for this log and updates the mapping
- * in the case it is missing some messages. Note that the id mapping
- * starts from a snapshot that is taken strictly before the log end
- * offset. Consequently, we need to process the tail of the log to update
- * the mapping.
- */
- private def buildAndRecoverPidMap(lastOffset: Long) {
- lock synchronized {
- info(s"Recovering PID mapping from offset $lastOffset for partition $topicPartition")
- val currentTimeMs = time.milliseconds
- pidMap.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
- logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment =>
- val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset)
+ private def loadProducerState(lastOffset: Long): Unit = lock synchronized {
+ info(s"Loading producer state from offset $lastOffset for partition $topicPartition")
+ val currentTimeMs = time.milliseconds
+ producerStateManager.truncateAndReload(logStartOffset, lastOffset, currentTimeMs)
+
+ // only do the potentially expensive reloading of the last snapshot offset is lower than the
+ // log end offset (which would be the case on first startup) and there are active producers.
+ // if there are no active producers, then truncating shouldn't change that fact (although it
+ // could cause a producerId to expire earlier than expected), so we can skip the loading.
+ // This is an optimization for users which are not yet using idempotent/transactional features yet.
+ if (lastOffset > producerStateManager.mapEndOffset || !producerStateManager.isEmpty) {
+ logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment =>
+ val startOffset = math.max(segment.baseOffset, producerStateManager.mapEndOffset)
val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
- if (fetchDataInfo != null) {
- fetchDataInfo.records.batches.asScala.foreach { batch =>
- if (batch.hasProducerId) {
- val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset,
- batch.lastSequence - batch.baseSequence, batch.maxTimestamp)
- pidMap.load(batch.producerId, pidEntry, currentTimeMs)
- }
- }
- }
+ if (fetchDataInfo != null)
+ loadProducersFromLog(producerStateManager, fetchDataInfo.records)
}
- pidMap.updateMapEndOffset(lastOffset)
}
+
+ producerStateManager.updateMapEndOffset(lastOffset)
+ updateFirstUnstableOffset()
}
- private[log] def activePids: Map[Long, ProducerIdEntry] = {
- lock synchronized {
- pidMap.activePids
+ private def loadProducersFromLog(producerStateManager: ProducerStateManager, records: Records): Unit = {
+ val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+ val completedTxns = ListBuffer.empty[CompletedTxn]
+ records.batches.asScala.foreach { batch =>
+ if (batch.hasProducerId) {
+ val lastEntry = producerStateManager.lastEntry(batch.producerId)
+ updateProducers(batch, loadedProducers, completedTxns, lastEntry, loadingFromLog = true)
+ }
}
+ loadedProducers.values.foreach(producerStateManager.update)
+ completedTxns.foreach(producerStateManager.completeTxn)
+ }
+
+ private[log] def activePids: Map[Long, ProducerIdEntry] = lock synchronized {
+ producerStateManager.activeProducers
}
/**
@@ -426,47 +471,50 @@ class Log(@volatile var dir: File,
def close() {
debug(s"Closing log $name")
lock synchronized {
+ producerStateManager.takeSnapshot()
logSegments.foreach(_.close())
}
}
/**
- * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
- * @param records The records to append
- * @throws KafkaStorageException If the append fails due to an I/O error.
- * @return Information about the appended messages including the first and last offset.
- */
- def appendAsLeader(records: MemoryRecords, leaderEpoch: Int): LogAppendInfo = {
- append(records, assignOffsets = true, leaderEpoch)
+ * Append this message set to the active segment of the log, assigning offsets and Partition Leader Epochs
+ * @param records The records to append
+ * @param isFromClient Whether or not this append is from a producer
+ * @throws KafkaStorageException If the append fails due to an I/O error.
+ * @return Information about the appended messages including the first and last offset.
+ */
+ def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
+ append(records, isFromClient, assignOffsets = true, leaderEpoch)
}
/**
- * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
- * @param records The records to append
- * @throws KafkaStorageException If the append fails due to an I/O error.
- * @return Information about the appended messages including the first and last offset.
- */
+ * Append this message set to the active segment of the log without assigning offsets or Partition Leader Epochs
+ * @param records The records to append
+ * @throws KafkaStorageException If the append fails due to an I/O error.
+ * @return Information about the appended messages including the first and last offset.
+ */
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
- append(records, assignOffsets = false, leaderEpoch = -1)
+ append(records, isFromClient = false, assignOffsets = false, leaderEpoch = -1)
}
/**
- * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
- *
- * This method will generally be responsible for assigning offsets to the messages,
- * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
- *
- * @param records The log records to append
- * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
- * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
- * @throws KafkaStorageException If the append fails due to an I/O error.
- * @return Information about the appended messages including the first and last offset.
- */
- private def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
- val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
+ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+ *
+ * This method will generally be responsible for assigning offsets to the messages,
+ * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
+ *
+ * @param records The log records to append
+ * @param isFromClient Whether or not this append is from a producer
+ * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+ * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
+ * @throws KafkaStorageException If the append fails due to an I/O error.
+ * @return Information about the appended messages including the first and last offset.
+ */
+ private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean = true, leaderEpoch: Int): LogAppendInfo = {
+ val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
// return if we have no valid messages or if this is a duplicate of the last appended entry
- if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate)
+ if (appendInfo.shallowCount == 0)
return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -483,15 +531,16 @@ class Log(@volatile var dir: File,
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
- offset,
- now,
- appendInfo.sourceCodec,
- appendInfo.targetCodec,
- config.compact,
- config.messageFormatVersion.messageFormatVersion,
- config.messageTimestampType,
- config.messageTimestampDifferenceMaxMs,
- leaderEpoch)
+ offset,
+ now,
+ appendInfo.sourceCodec,
+ appendInfo.targetCodec,
+ config.compact,
+ config.messageFormatVersion.messageFormatVersion,
+ config.messageTimestampType,
+ config.messageTimestampDifferenceMaxMs,
+ leaderEpoch,
+ isFromClient)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
@@ -534,34 +583,56 @@ class Log(@volatile var dir: File,
.format(validRecords.sizeInBytes, config.segmentSize))
}
+ // now that we have valid records, offsets assigned, and timestamps updated, we need to
+ // validate the idempotent/transactional state of the producers and collect some metadata
+ val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
+ maybeDuplicate.foreach { duplicate =>
+ appendInfo.firstOffset = duplicate.firstOffset
+ appendInfo.lastOffset = duplicate.lastOffset
+ appendInfo.logAppendTime = duplicate.timestamp
+ return appendInfo
+ }
+
// maybe roll the log if this segment is full
val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)
- // now append to the log
+ val logOffsetMetadata = LogOffsetMetadata(
+ messageOffset = appendInfo.firstOffset,
+ segmentBaseOffset = segment.baseOffset,
+ relativePositionInSegment = segment.size)
+
segment.append(firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
- // update the PID sequence mapping
- for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) {
- trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}")
+ // update the producer state
+ for ((producerId, producerAppendInfo) <- updatedProducers) {
+ trace(s"Updating producer $producerId state: ${producerAppendInfo.lastEntry}")
+ producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
+ producerStateManager.update(producerAppendInfo)
+ }
- if (assignOffsets)
- producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp)
- pidMap.update(producerAppendInfo)
+ // update the transaction index with the true last stable offset. The last offset visible
+ // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
+ for (completedTxn <- completedTxns) {
+ val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+ segment.updateTxnIndex(completedTxn, lastStableOffset)
}
// always update the last pid map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
- pidMap.updateMapEndOffset(appendInfo.lastOffset + 1)
+ producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
+ // update the first unstable offset (which is used to compute LSO)
+ updateFirstUnstableOffset()
+
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
@@ -575,6 +646,24 @@ class Log(@volatile var dir: File,
}
}
+ def onHighWatermarkIncremented(highWatermark: Long): Unit = {
+ lock synchronized {
+ producerStateManager.onHighWatermarkUpdated(highWatermark)
+ updateFirstUnstableOffset()
+ }
+ }
+
+ private def updateFirstUnstableOffset(): Unit = lock synchronized {
+ this.firstUnstableOffset = producerStateManager.firstUnstableOffset match {
+ case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly =>
+ val offset = logOffsetMetadata.messageOffset
+ val segment = segments.floorEntry(offset).getValue
+ val position = segment.translateOffset(offset)
+ Some(LogOffsetMetadata(offset, segment.baseOffset, position.position))
+ case other => other
+ }
+ }
+
/**
* Increment the log start offset if the provided offset is larger.
*/
@@ -589,6 +678,23 @@ class Log(@volatile var dir: File,
}
}
+ private def analyzeAndValidateProducerState(records: MemoryRecords, isFromClient: Boolean):
+ (mutable.Map[Long, ProducerAppendInfo], List[CompletedTxn], Option[ProducerIdEntry]) = {
+ val updatedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
+ val completedTxns = ListBuffer.empty[CompletedTxn]
+ for (batch <- records.batches.asScala if batch.hasProducerId) {
+ val maybeLastEntry = producerStateManager.lastEntry(batch.producerId)
+
+ // if this is a client produce request, there will be only one batch. If that batch matches
+ // the last appended entry for that producer, then this request is a duplicate and we return
+ // the last appended entry to the client.
+ if (isFromClient && maybeLastEntry.exists(_.isDuplicate(batch)))
+ return (updatedProducers, completedTxns.toList, maybeLastEntry)
+ updateProducers(batch, updatedProducers, completedTxns, maybeLastEntry, loadingFromLog = false)
+ }
+ (updatedProducers, completedTxns.toList, None)
+ }
+
/**
* Validate the following:
* <ol>
@@ -616,8 +722,6 @@ class Log(@volatile var dir: File,
var monotonic = true
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
- var isDuplicate = false
- val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
for (batch <- records.batches.asScala) {
// we only validate V2 and higher to avoid potential compatibility issues with older clients
@@ -660,37 +764,23 @@ class Log(@volatile var dir: File,
val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
-
- val pid = batch.producerId
- if (pid != RecordBatch.NO_PRODUCER_ID) {
- producerAppendInfos.get(pid) match {
- case Some(appendInfo) => appendInfo.append(batch)
- case None =>
- val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)
- if (isFromClient && lastEntry.isDuplicate(batch)) {
- // This request is a duplicate so return the information about the existing entry. Note that for requests
- // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration
- // of the loop and the values below will not be updated more than once.
- isDuplicate = true
- firstOffset = lastEntry.firstOffset
- lastOffset = lastEntry.lastOffset
- maxTimestamp = lastEntry.timestamp
- debug(s"Detected a duplicate for partition $topicPartition at (firstOffset, lastOffset): ($firstOffset, $lastOffset). " +
- "Ignoring the incoming record.")
- } else {
- val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
- producerAppendInfos.put(pid, producerAppendInfo)
- producerAppendInfo.append(batch)
- }
- }
- }
}
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
-
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
- targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate)
+ targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ }
+
+ private def updateProducers(batch: RecordBatch,
+ producers: mutable.Map[Long, ProducerAppendInfo],
+ completedTxns: ListBuffer[CompletedTxn],
+ lastEntry: Option[ProducerIdEntry],
+ loadingFromLog: Boolean): Unit = {
+ val pid = batch.producerId
+ val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog))
+ val maybeCompletedTxn = appendInfo.append(batch)
+ maybeCompletedTxn.foreach(completedTxns += _)
}
/**
@@ -721,11 +811,19 @@ class Log(@volatile var dir: File,
* @param maxLength The maximum number of bytes to read
* @param maxOffset The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
+ * @param isolationLevel The isolation level of the fetcher. The READ_UNCOMMITTED isolation level has the traditional
+ * read semantics (e.g. consumers are limited to fetching up to the high watermark). In
+ * READ_COMMITTED, consumers are limited to fetching up to the last stable offset. Additionally,
+ * in READ_COMMITTED, the transaction index is consulted after fetching to collect the list
+ * of aborted transactions in the fetch range which the consumer uses to filter the fetched
+ * records before they are returned to the user. Note that fetches from followers always use
+ * READ_UNCOMMITTED.
*
* @throws OffsetOutOfRangeException If startOffset is beyond the log end offset or before the log start offset
* @return The fetch data information including fetch starting offset metadata and messages read.
*/
- def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false): FetchDataInfo = {
+ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
+ isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
// Because we don't use lock for reading, the synchronization is a little bit tricky.
@@ -735,38 +833,43 @@ class Log(@volatile var dir: File,
if(startOffset == next)
return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY)
- var entry = segments.floorEntry(startOffset)
+ var segmentEntry = segments.floorEntry(startOffset)
// return error on attempt to read beyond the log end offset or read below log start offset
- if(startOffset > next || entry == null || startOffset < logStartOffset)
+ if(startOffset > next || segmentEntry == null || startOffset < logStartOffset)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, logStartOffset, next))
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
- while(entry != null) {
+ while(segmentEntry != null) {
+ val segment = segmentEntry.getValue
+
// If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
// the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
// cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
// end of the active segment.
val maxPosition = {
- if (entry == segments.lastEntry) {
+ if (segmentEntry == segments.lastEntry) {
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
// Check the segment again in case a new segment has just rolled out.
- if (entry != segments.lastEntry)
+ if (segmentEntry != segments.lastEntry)
// New log segment has rolled out, we can read up to the file end.
- entry.getValue.size
+ segment.size
else
exposedPos
} else {
- entry.getValue.size
+ segment.size
}
}
- val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
- if(fetchInfo == null) {
- entry = segments.higherEntry(entry.getKey)
+ val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
+ if (fetchInfo == null) {
+ segmentEntry = segments.higherEntry(segmentEntry.getKey)
} else {
- return fetchInfo
+ return isolationLevel match {
+ case IsolationLevel.READ_UNCOMMITTED => fetchInfo
+ case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
+ }
}
}
@@ -776,6 +879,41 @@ class Log(@volatile var dir: File,
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
+ private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment],
+ fetchInfo: FetchDataInfo): FetchDataInfo = {
+ val fetchSize = fetchInfo.records.sizeInBytes
+ val startOffsetPosition = OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+ fetchInfo.fetchOffsetMetadata.relativePositionInSegment)
+ val upperBoundOffset = segmentEntry.getValue.fetchUpperBoundOffset(startOffsetPosition, fetchSize).getOrElse {
+ val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
+ if (nextSegmentEntry != null)
+ nextSegmentEntry.getValue.baseOffset
+ else
+ logEndOffset
+ }
+ val abortedTransactions = collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry)
+ FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
+ records = fetchInfo.records,
+ firstEntryIncomplete = fetchInfo.firstEntryIncomplete,
+ abortedTransactions = Some(abortedTransactions))
+ }
+
+ private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
+ startingSegmentEntry: JEntry[JLong, LogSegment]): List[AbortedTransaction] = {
+ var segmentEntry = startingSegmentEntry
+ val abortedTransactions = ListBuffer.empty[AbortedTransaction]
+
+ while (segmentEntry != null) {
+ val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset)
+ abortedTransactions ++= searchResult.abortedTransactions
+ if (searchResult.isComplete)
+ return abortedTransactions.toList
+
+ segmentEntry = segments.higherEntry(segmentEntry.getKey)
+ }
+ abortedTransactions.toList
+ }
+
/**
* Get an offset based on the given timestamp
* The offset returned is the offset of the first message whose timestamp is greater than or equals to the
@@ -860,7 +998,8 @@ class Log(@volatile var dir: File,
deletable.foreach(deleteSegment)
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
leaderEpochCache.clearEarliest(logStartOffset)
- pidMap.expirePids(logStartOffset)
+ producerStateManager.evictUnretainedProducers(logStartOffset)
+ updateFirstUnstableOffset()
}
}
numToDelete
@@ -934,7 +1073,7 @@ class Log(@volatile var dir: File,
def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
/**
- * The offset of the next message that will be appended to the log
+ * The offset of the next message that will be appended to the log
*/
def logEndOffset: Long = nextOffsetMetadata.messageOffset
@@ -990,9 +1129,10 @@ class Log(@volatile var dir: File,
lock synchronized {
val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = logFilename(dir, newOffset)
- val indexFile = indexFilename(dir, newOffset)
- val timeIndexFile = timeIndexFilename(dir, newOffset)
- for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
+ val offsetIdxFile = offsetIndexFile(dir, newOffset)
+ val timeIdxFile = timeIndexFile(dir, newOffset)
+ val txnIdxFile = transactionIndexFile(dir, newOffset)
+ for(file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
@@ -1007,6 +1147,15 @@ class Log(@volatile var dir: File,
seg.log.trim()
}
}
+
+ // take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot
+ // offset align with the new segment offset since this ensures we can recover the segment by beginning
+ // with the corresponding snapshot file and scanning the segment data. Because the segment base offset
+ // may actually be ahead of the current producer state end offset (which corresponds to the log end offset),
+ // we manually override the state offset here prior to taking the snapshot.
+ producerStateManager.updateMapEndOffset(newOffset)
+ producerStateManager.takeSnapshot()
+
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
@@ -1053,6 +1202,12 @@ class Log(@volatile var dir: File,
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
+
+ // now that we have flushed, we can cleanup old producer snapshots. However, it is useful to retain
+ // the snapshots from the recent segments in case we need to truncate and rebuild the producer state.
+ // Otherwise, we would always need to rebuild from the earliest segment.
+ producerStateManager.deleteSnapshotsBefore(minSnapshotOffsetToRetain(offset))
+
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
@@ -1061,6 +1216,17 @@ class Log(@volatile var dir: File,
}
}
+ def minSnapshotOffsetToRetain(flushedOffset: Long) = {
+ // always retain the producer snapshot from the last two segments. This solves the common case
+ // of truncating to an offset within the active segment, and the rarer case of truncating to the
+ // previous segment just after rolling the new segment.
+ var minSnapshotOffset = activeSegment.baseOffset
+ val previousSegment = segments.lowerEntry(activeSegment.baseOffset)
+ if (previousSegment != null)
+ minSnapshotOffset = previousSegment.getValue.baseOffset
+ math.min(flushedOffset, minSnapshotOffset)
+ }
+
/**
* Completely delete this log directory and all contents from the file system with no delay
*/
@@ -1073,11 +1239,25 @@ class Log(@volatile var dir: File,
}
}
- private[log] def maybeTakePidSnapshot(): Unit = pidMap.maybeTakeSnapshot()
+ // visible for testing
+ private[log] def takeProducerSnapshot(): Unit = lock synchronized {
+ producerStateManager.takeSnapshot()
+ }
- private[log] def latestPidSnapshotOffset: Option[Long] = pidMap.latestSnapshotOffset
+ // visible for testing
+ private[log] def latestProducerSnapshotOffset: Option[Long] = lock synchronized {
+ producerStateManager.latestSnapshotOffset
+ }
- private[log] def latestPidMapOffset: Long = pidMap.mapEndOffset
+ // visible for testing
+ private[log] def oldestProducerSnapshotOffset: Option[Long] = lock synchronized {
+ producerStateManager.oldestSnapshotOffset
+ }
+
+ // visible for testing
+ private[log] def latestProducerStateEndOffset: Long = lock synchronized {
+ producerStateManager.mapEndOffset
+ }
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
@@ -1103,7 +1283,7 @@ class Log(@volatile var dir: File,
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
leaderEpochCache.clearLatest(targetOffset)
- buildAndRecoverPidMap(targetOffset)
+ loadProducerState(targetOffset)
}
}
}
@@ -1130,8 +1310,9 @@ class Log(@volatile var dir: File,
updateLogEndOffset(newOffset)
leaderEpochCache.clear()
- pidMap.truncate()
- pidMap.updateMapEndOffset(newOffset)
+ producerStateManager.truncate()
+ producerStateManager.updateMapEndOffset(newOffset)
+ updateFirstUnstableOffset()
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
this.logStartOffset = newOffset
@@ -1282,6 +1463,9 @@ object Log {
val PidSnapshotFileSuffix = ".snapshot"
+ /** an (aborted) txn index */
+ val TxnIndexFileSuffix = ".txnindex"
+
/** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
@@ -1331,7 +1515,7 @@ object Log {
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
- def indexFilename(dir: File, offset: Long) =
+ def offsetIndexFile(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
/**
@@ -1340,7 +1524,7 @@ object Log {
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
- def timeIndexFilename(dir: File, offset: Long) =
+ def timeIndexFile(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
/**
@@ -1349,9 +1533,12 @@ object Log {
* @param dir The directory in which the log will reside
* @param offset The last offset (exclusive) included in the snapshot
*/
- def pidSnapshotFilename(dir: File, offset: Long) =
+ def producerSnapshotFile(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + PidSnapshotFileSuffix)
+ def transactionIndexFile(dir: File, offset: Long) =
+ new File(dir, filenamePrefixFromOffset(offset) + TxnIndexFileSuffix)
+
def offsetFromFilename(filename: String): Long =
filename.substring(0, filename.indexOf('.')).toLong
@@ -1387,4 +1574,12 @@ object Log {
new TopicPartition(topic, partition.toInt)
}
+ private def isIndexFile(file: File): Boolean = {
+ val filename = file.getName
+ filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix) || filename.endsWith(TxnIndexFileSuffix)
+ }
+
+ private def isLogFile(file: File): Boolean =
+ file.getPath.endsWith(LogFileSuffix)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index d0e8ec4..282e049 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -388,12 +388,18 @@ private[log] class Cleaner(val id: Int,
logFile.delete()
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
+ val txnIndexFile = new File(segments.head.txnIndex.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
timeIndexFile.delete()
+ txnIndexFile.delete()
+
+ val startOffset = segments.head.baseOffset
val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate)
- val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
- val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
- val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+ val index = new OffsetIndex(indexFile, startOffset, segments.head.index.maxIndexSize)
+ val timeIndex = new TimeIndex(timeIndexFile, startOffset, segments.head.timeIndex.maxIndexSize)
+ val txnIndex = new TransactionIndex(startOffset, txnIndexFile)
+ val cleaned = new LogSegment(records, index, timeIndex, txnIndex, startOffset,
+ segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
try {
// clean segments into the new destination segment
@@ -451,7 +457,8 @@ private[log] class Cleaner(val id: Int,
activePids: Map[Long, ProducerIdEntry],
stats: CleanerStats) {
val logCleanerFilter = new RecordFilter {
- def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId)
+ def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean =
+ shouldRetainMessage(source, map, retainDeletes, recordBatch, record, stats, activePids)
}
var position = 0
@@ -492,17 +499,20 @@ private[log] class Cleaner(val id: Int,
private def shouldRetainMessage(source: kafka.log.LogSegment,
map: kafka.log.OffsetMap,
retainDeletes: Boolean,
+ batch: RecordBatch,
record: Record,
stats: CleanerStats,
- activePids: Map[Long, ProducerIdEntry],
- pid: Long): Boolean = {
- if (record.isControlRecord)
+ activeProducers: Map[Long, ProducerIdEntry]): Boolean = {
+ if (batch.isControlBatch)
return true
// retain the record if it is the last one produced by an active idempotent producer to ensure that
- // the PID is not removed from the log before it has been expired
- if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
- return true
+ // the producerId is not removed from the log before it has been expired
+ if (batch.hasProducerId) {
+ val producerId = batch.producerId
+ if (RecordBatch.NO_PRODUCER_ID < producerId && activeProducers.get(producerId).exists(_.lastOffset == record.offset))
+ return true
+ }
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
@@ -638,8 +648,8 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(records.sizeInBytes)
val startPosition = position
- for (record <- records.records.asScala) {
- if (!record.isControlRecord && record.hasKey && record.offset >= start) {
+ for (batch <- records.batches.asScala; record <- batch.asScala) {
+ if (!batch.isControlBatch && record.hasKey && record.offset >= start) {
if (map.size < maxDesiredMapSize)
map.put(record.key, record.offset)
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b89fc40..c621680 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -189,7 +189,7 @@ class LogManager(val logDirs: Array[File],
}
}
- jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
+ jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
}
@@ -282,7 +282,6 @@ class LogManager(val logDirs: Array[File],
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}
-
try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
@@ -312,7 +311,6 @@ class LogManager(val logDirs: Array[File],
info("Shutdown complete.")
}
-
/**
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
@@ -454,7 +452,7 @@ class LogManager(val logDirs: Array[File],
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
}
-}
+ }
/**
* Rename the directory of the given topic-partition "logdir" as "logdir.uuid.delete" and
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index df3c372..d76b47a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -27,14 +27,14 @@ import kafka.server.epoch.LeaderEpochCache
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords.LogEntryPosition
+import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
import scala.math._
- /**
+/**
* A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
* the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each
* segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
@@ -53,6 +53,7 @@ import scala.math._
class LogSegment(val log: FileRecords,
val index: OffsetIndex,
val timeIndex: TimeIndex,
+ val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
@@ -67,45 +68,49 @@ class LogSegment(val log: FileRecords,
private var rollingBasedTimestamp: Option[Long] = None
/* The maximum timestamp we see so far */
- @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
- @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
+ @volatile private var maxTimestampSoFar: Long = timeIndex.lastEntry.timestamp
+ @volatile private var offsetOfMaxTimestamp: Long = timeIndex.lastEntry.offset
- def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
+ def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time,
+ fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(FileRecords.open(Log.logFilename(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
- new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
- new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+ new OffsetIndex(Log.offsetIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+ new TimeIndex(Log.timeIndexFile(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+ new TransactionIndex(startOffset, Log.transactionIndexFile(dir, startOffset)),
startOffset,
indexIntervalBytes,
rollJitterMs,
time)
/* Return the size in bytes of this log segment */
- def size: Long = log.sizeInBytes()
+ def size: Int = log.sizeInBytes()
- /**
- * checks that the argument offset can be represented as an integer offset relative to the baseOffset.
- */
- def canConvertToRelativeOffset(offset: Long): Boolean = {
- (offset - baseOffset) <= Integer.MAX_VALUE
- }
+ /**
+ * checks that the argument offset can be represented as an integer offset relative to the baseOffset.
+ */
+ def canConvertToRelativeOffset(offset: Long): Boolean = {
+ (offset - baseOffset) <= Integer.MAX_VALUE
+ }
- /**
+ /**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param firstOffset The first offset in the message set.
+ * @param largestOffset The last offset in the message set
* @param largestTimestamp The largest timestamp in the message set.
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param records The log entries to append.
+ * @return the physical position in the file of the appended records
*/
@nonthreadsafe
def append(firstOffset: Long,
largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
- records: MemoryRecords) {
+ records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
@@ -131,6 +136,28 @@ class LogSegment(val log: FileRecords,
}
}
+ @nonthreadsafe
+ def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long) {
+ if (completedTxn.isAborted) {
+ trace(s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset")
+ txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset))
+ }
+ }
+
+ private def updateProducerState(producerStateManager: ProducerStateManager, batch: RecordBatch): Unit = {
+ if (batch.hasProducerId) {
+ val producerId = batch.producerId
+ val lastEntry = producerStateManager.lastEntry(producerId)
+ val appendInfo = new ProducerAppendInfo(batch.producerId, lastEntry, loadingFromLog = true)
+ val maybeCompletedTxn = appendInfo.append(batch)
+ producerStateManager.update(appendInfo)
+ maybeCompletedTxn.foreach { completedTxn =>
+ val lastStableOffset = producerStateManager.completeTxn(completedTxn)
+ updateTxnIndex(completedTxn, lastStableOffset)
+ }
+ }
+ }
+
/**
* Find the physical file position for the first message with offset >= the requested offset.
*
@@ -144,7 +171,7 @@ class LogSegment(val log: FileRecords,
* message or null if no message meets this criteria.
*/
@threadsafe
- private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
+ private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
val mapping = index.lookup(offset)
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
@@ -175,7 +202,7 @@ class LogSegment(val log: FileRecords,
if (startOffsetAndSize == null)
return null
- val startPosition = startOffsetAndSize.position.toInt
+ val startPosition = startOffsetAndSize.position
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
@@ -187,7 +214,7 @@ class LogSegment(val log: FileRecords,
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
- val length = maxOffset match {
+ val fetchSize: Int = maxOffset match {
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition).toInt, adjustedMaxSize)
@@ -207,24 +234,32 @@ class LogSegment(val log: FileRecords,
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
}
- FetchDataInfo(offsetMetadata, log.read(startPosition, length),
+ FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
+ def fetchUpperBoundOffset(startOffsetPosition: OffsetPosition, fetchSize: Int): Option[Long] =
+ index.fetchUpperBoundOffset(startOffsetPosition, fetchSize).map(_.offset)
+
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
- * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
+ * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
+ * the transaction index.
+ * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
- def recover(maxMessageSize: Int, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
+ def recover(maxMessageSize: Int,
+ producerStateManager: ProducerStateManager,
+ leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
timeIndex.resize(timeIndex.maxIndexSize)
+ txnIndex.truncate()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
@@ -250,8 +285,9 @@ class LogSegment(val log: FileRecords,
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
- cache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+ cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
+ updateProducerState(producerStateManager, batch)
}
}
} catch {
@@ -268,22 +304,23 @@ class LogSegment(val log: FileRecords,
truncated
}
- def loadLargestTimestamp(readToLogEnd: Boolean = false) {
+ private def loadLargestTimestamp() {
// Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
val lastTimeIndexEntry = timeIndex.lastEntry
maxTimestampSoFar = lastTimeIndexEntry.timestamp
offsetOfMaxTimestamp = lastTimeIndexEntry.offset
- if (readToLogEnd) {
- val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
- // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
- val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
- if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
- maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
- offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
- }
+
+ val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+ // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
+ val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
+ if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
+ maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
+ offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
}
}
+ def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult =
+ txnIndex.collectAbortedTxns(fetchOffset, upperBoundOffset)
override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
@@ -301,6 +338,7 @@ class LogSegment(val log: FileRecords,
return 0
index.truncateTo(offset)
timeIndex.truncateTo(offset)
+ txnIndex.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
timeIndex.resize(timeIndex.maxIndexSize)
@@ -310,9 +348,8 @@ class LogSegment(val log: FileRecords,
rollingBasedTimestamp = None
}
bytesSinceLastIndexEntry = 0
- // We may need to reload the max timestamp after truncation.
if (maxTimestampSoFar >= 0)
- loadLargestTimestamp(readToLogEnd = true)
+ loadLargestTimestamp()
bytesTruncated
}
@@ -323,14 +360,12 @@ class LogSegment(val log: FileRecords,
@threadsafe
def nextOffset(): Long = {
val ms = read(index.lastOffset, None, log.sizeInBytes)
- if (ms == null) {
+ if (ms == null)
baseOffset
- } else {
- ms.records.batches.asScala.lastOption match {
- case None => baseOffset
- case Some(last) => last.nextOffset
- }
- }
+ else
+ ms.records.batches.asScala.lastOption
+ .map(_.nextOffset)
+ .getOrElse(baseOffset)
}
/**
@@ -342,6 +377,7 @@ class LogSegment(val log: FileRecords,
log.flush()
index.flush()
timeIndex.flush()
+ txnIndex.flush()
}
}
@@ -365,6 +401,10 @@ class LogSegment(val log: FileRecords,
catch {
case e: IOException => throw kafkaStorageException("timeindex", e)
}
+ try txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
+ catch {
+ case e: IOException => throw kafkaStorageException("txnindex", e)
+ }
}
/**
@@ -437,6 +477,7 @@ class LogSegment(val log: FileRecords,
CoreUtils.swallow(index.close())
CoreUtils.swallow(timeIndex.close())
CoreUtils.swallow(log.close())
+ CoreUtils.swallow(txnIndex.close())
}
/**
@@ -448,12 +489,15 @@ class LogSegment(val log: FileRecords,
val deletedLog = log.delete()
val deletedIndex = index.delete()
val deletedTimeIndex = timeIndex.delete()
- if(!deletedLog && log.file.exists)
+ val deletedTxnIndex = txnIndex.delete()
+ if (!deletedLog && log.file.exists)
throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
- if(!deletedIndex && index.file.exists)
+ if (!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
- if(!deletedTimeIndex && timeIndex.file.exists)
+ if (!deletedTimeIndex && timeIndex.file.exists)
throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
+ if (!deletedTxnIndex && txnIndex.file.exists)
+ throw new KafkaStorageException("Delete of transaction index " + txnIndex.file.getName + " failed.")
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index c1777d5..33257fd 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -52,24 +52,32 @@ private[kafka] object LogValidator extends Logging {
magic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, timestampType,
- timestampDiffMaxMs, magic, partitionLeaderEpoch)
+ timestampDiffMaxMs, magic, partitionLeaderEpoch, isFromClient)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
- partitionLeaderEpoch)
+ partitionLeaderEpoch, isFromClient)
} else {
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
- magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch)
+ magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
}
}
- private def validateBatch(batch: RecordBatch): Unit = {
- ensureNonTransactional(batch)
+ private def validateBatch(batch: RecordBatch, isFromClient: Boolean): Unit = {
+ if (isFromClient) {
+ if (batch.hasProducerId && batch.baseSequence < 0)
+ throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " +
+ s"with producerId ${batch.producerId}")
+
+ if (batch.isControlBatch)
+ throw new InvalidRecordException("Clients are not allowed to write control records")
+ }
}
private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType,
@@ -84,7 +92,6 @@ private[kafka] object LogValidator extends Logging {
if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed)
record.ensureValid()
- ensureNotControlRecord(record)
validateKey(record, compactedTopic)
validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)
}
@@ -96,21 +103,22 @@ private[kafka] object LogValidator extends Logging {
timestampType: TimestampType,
timestampDiffMaxMs: Long,
toMagicValue: Byte,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean): ValidationAndOffsetAssignResult = {
val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
CompressionType.NONE, records.records)
- val (pid, epoch, sequence) = {
+ val (producerId, producerEpoch, sequence, isTransactional) = {
val first = records.batches.asScala.head
- (first.producerId, first.producerEpoch, first.baseSequence)
+ (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
- offsetCounter.value, now, pid, epoch, sequence, false, partitionLeaderEpoch)
+ offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
for (batch <- records.batches.asScala) {
- validateBatch(batch)
+ validateBatch(batch, isFromClient)
for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
@@ -133,21 +141,21 @@ private[kafka] object LogValidator extends Logging {
compactedTopic: Boolean,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean): ValidationAndOffsetAssignResult = {
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
var isMagicV2 = false
for (batch <- records.batches.asScala) {
- validateBatch(batch)
+ validateBatch(batch, isFromClient)
var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxBatchTimestamp = -1L
for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
-
val offset = offsetCounter.getAndIncrement()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && record.timestamp > maxBatchTimestamp) {
maxBatchTimestamp = record.timestamp
@@ -206,7 +214,8 @@ private[kafka] object LogValidator extends Logging {
magic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
- partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ partitionLeaderEpoch: Int,
+ isFromClient: Boolean): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2
var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0
@@ -216,14 +225,17 @@ private[kafka] object LogValidator extends Logging {
val validatedRecords = new mutable.ArrayBuffer[Record]
for (batch <- records.batches.asScala) {
- validateBatch(batch)
+ validateBatch(batch, isFromClient)
+
+ // Do not compress control records unless they are written compressed
+ if (sourceCodec == NoCompressionCodec && batch.isControlBatch)
+ inPlaceAssignment = true
for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
-
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) {
// Check if we need to overwrite offset
// No in place assignment situation 3
@@ -242,15 +254,15 @@ private[kafka] object LogValidator extends Logging {
}
if (!inPlaceAssignment) {
- val (pid, epoch, sequence) = {
+ val (producerId, producerEpoch, sequence, isTransactional) = {
// note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
// with older magic versions, there will never be a producer id, etc.
val first = records.batches.asScala.head
- (first.producerId, first.producerEpoch, first.baseSequence)
+ (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now,
- validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
+ validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -274,14 +286,22 @@ private[kafka] object LogValidator extends Logging {
}
}
- private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
- compressionType: CompressionType, logAppendTime: Long,
+ private def buildRecordsAndAssignOffsets(magic: Byte,
+ offsetCounter: LongRef,
+ timestampType: TimestampType,
+ compressionType: CompressionType,
+ logAppendTime: Long,
validatedRecords: Seq[Record],
- producerId: Long, epoch: Short, baseSequence: Int, partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
- val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
+ producerId: Long,
+ producerEpoch: Short,
+ baseSequence: Int,
+ isTransactional: Boolean,
+ partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
+ val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType,
+ validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
- logAppendTime, producerId, epoch, baseSequence, false, partitionLeaderEpoch)
+ logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch)
validatedRecords.foreach { record =>
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
@@ -297,17 +317,6 @@ private[kafka] object LogValidator extends Logging {
messageSizeMaybeChanged = true)
}
- private def ensureNonTransactional(batch: RecordBatch) {
- if (batch.isTransactional)
- throw new InvalidRecordException("Transactional messages are not currently supported")
- }
-
- private def ensureNotControlRecord(record: Record) {
- // Until we have implemented transaction support, we do not permit control records to be written
- if (record.isControlRecord)
- throw new InvalidRecordException("Control messages are not currently supported")
- }
-
private def validateKey(record: Record, compactedTopic: Boolean) {
if (compactedTopic && !record.hasKey)
throw new InvalidRecordException("Compacted topic cannot accept message without key.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index a54579f..e4939e8 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -85,7 +85,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate
- val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
+ val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
@@ -93,6 +93,22 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
}
}
+ /**
+ * Find an upper bound offset for the given fetch starting position and size. This is an offset which
+ * is guaranteed to be outside the fetched range, but note that it will not generally be the smallest
+ * such offset.
+ */
+ def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
+ maybeLock(lock) {
+ val idx = mmap.duplicate
+ val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
+ if (slot == -1)
+ None
+ else
+ Some(parseEntry(idx, slot).asInstanceOf[OffsetPosition])
+ }
+ }
+
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
@@ -140,7 +156,7 @@ class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable
override def truncateTo(offset: Long) {
inLock(lock) {
val idx = mmap.duplicate
- val slot = indexSlotFor(idx, offset, IndexSearchType.KEY)
+ val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY)
/* There are 3 cases for choosing the new size
* 1) if there is no entry in the index <= the offset, delete everything