You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/08/19 17:07:12 UTC
[2/2] kafka git commit: KAFKA-3163; Add time based index to Kafka.
KAFKA-3163; Add time based index to Kafka.
This patch is for KIP-33.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>, Liquan Pei <li...@gmail.com>
Closes #1215 from becketqin/KAFKA-3163
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79d3fd2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79d3fd2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79d3fd2b
Branch: refs/heads/trunk
Commit: 79d3fd2bf0e5c89ff74a2988c403882ae8a9852e
Parents: 05ed54b
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Aug 19 10:07:07 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 19 10:07:07 2016 -0700
----------------------------------------------------------------------
.../main/scala/kafka/log/AbstractIndex.scala | 287 +++++++++++++++++
.../main/scala/kafka/log/FileMessageSet.scala | 78 ++++-
core/src/main/scala/kafka/log/IndexEntry.scala | 46 +++
core/src/main/scala/kafka/log/Log.scala | 196 +++++++++---
core/src/main/scala/kafka/log/LogCleaner.scala | 41 ++-
core/src/main/scala/kafka/log/LogManager.scala | 6 +-
core/src/main/scala/kafka/log/LogSegment.scala | 169 ++++++++--
core/src/main/scala/kafka/log/OffsetIndex.scala | 306 +++----------------
.../main/scala/kafka/log/OffsetPosition.scala | 25 --
core/src/main/scala/kafka/log/TimeIndex.scala | 208 +++++++++++++
.../kafka/message/ByteBufferMessageSet.scala | 91 ++++--
.../scala/kafka/message/MessageAndOffset.scala | 8 +
.../src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../scala/kafka/server/ReplicaManager.scala | 2 +-
.../scala/kafka/tools/DumpLogSegments.scala | 122 +++++++-
.../test/scala/unit/kafka/log/CleanerTest.scala | 5 +-
.../unit/kafka/log/FileMessageSetTest.scala | 16 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 4 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 114 +++++--
.../src/test/scala/unit/kafka/log/LogTest.scala | 152 +++++++--
.../scala/unit/kafka/log/TimeIndexTest.scala | 97 ++++++
.../message/ByteBufferMessageSetTest.scala | 135 +++++---
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
docs/upgrade.html | 16 +-
24 files changed, 1631 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
new file mode 100644
index 0000000..d594f18
--- /dev/null
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.{File, RandomAccessFile}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.nio.channels.FileChannel
+import java.util.concurrent.locks.{Lock, ReentrantLock}
+
+import kafka.log.IndexSearchType.IndexSearchEntity
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.{CoreUtils, Logging, Os}
+import org.apache.kafka.common.utils.Utils
+import sun.nio.ch.DirectBuffer
+
+import scala.math.ceil
+
+/**
+ * The abstract index class which holds entry format agnostic methods.
+ *
+ * @param _file The index file
+ * @param baseOffset the base offset of the segment that this index is corresponding to.
+ * @param maxIndexSize The maximum index size in bytes.
+ */
+abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
+ extends Logging {
+
+ protected def entrySize: Int
+
+ protected val lock = new ReentrantLock
+
+ @volatile
+ protected var mmap: MappedByteBuffer = {
+ val newlyCreated = _file.createNewFile()
+ val raf = new RandomAccessFile(_file, "rw")
+ try {
+ /* pre-allocate the file if necessary */
+ if(newlyCreated) {
+ if(maxIndexSize < entrySize)
+ throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+ raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
+ }
+
+ /* memory-map the file */
+ val len = raf.length()
+ val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+ /* set the position in the index for the next entry */
+ if(newlyCreated)
+ idx.position(0)
+ else
+ // if this is a pre-existing index, assume it is valid and set position to last entry
+ idx.position(roundDownToExactMultiple(idx.limit, entrySize))
+ idx
+ } finally {
+ CoreUtils.swallow(raf.close())
+ }
+ }
+
+ /**
+ * The maximum number of entries this index can hold
+ */
+ @volatile
+ private[this] var _maxEntries = mmap.limit / entrySize
+
+ /** The number of entries in this index */
+ @volatile
+ protected var _entries = mmap.position / entrySize
+
+ /**
+ * True iff there are no more slots available in this index
+ */
+ def isFull: Boolean = _entries >= _maxEntries
+
+ def maxEntries: Int = _maxEntries
+
+ def entries: Int = _entries
+
+ /**
+ * The index file
+ */
+ def file: File = _file
+
+ /**
+ * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
+ * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
+ * loading segments from disk or truncating back to an old segment where a new log segment became active;
+ * we want to reset the index size to maximum index size to avoid rolling new segment.
+ */
+ def resize(newSize: Int) {
+ inLock(lock) {
+ val raf = new RandomAccessFile(_file, "rw")
+ val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
+ val position = mmap.position
+
+ /* Windows won't let us modify the file length while the file is mmapped :-( */
+ if(Os.isWindows)
+ forceUnmap(mmap)
+ try {
+ raf.setLength(roundedNewSize)
+ mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+ _maxEntries = mmap.limit / entrySize
+ mmap.position(position)
+ } finally {
+ CoreUtils.swallow(raf.close())
+ }
+ }
+ }
+
+ /**
+ * Rename the file that backs this offset index
+ *
+ * @throws IOException if rename fails
+ */
+ def renameTo(f: File) {
+ try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+ finally _file = f
+ }
+
+ /**
+ * Flush the data in the index to disk
+ */
+ def flush() {
+ inLock(lock) {
+ mmap.force()
+ }
+ }
+
+ /**
+ * Delete this index file
+ */
+ def delete(): Boolean = {
+ info(s"Deleting index ${_file.getAbsolutePath}")
+ if(Os.isWindows)
+ CoreUtils.swallow(forceUnmap(mmap))
+ _file.delete()
+ }
+
+ /**
+ * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
+ * the file.
+ */
+ def trimToValidSize() {
+ inLock(lock) {
+ resize(entrySize * _entries)
+ }
+ }
+
+ /**
+ * The number of bytes actually used by this index
+ */
+ def sizeInBytes = entrySize * _entries
+
+ /** Close the index */
+ def close() {
+ trimToValidSize()
+ }
+
+ /**
+ * Do a basic sanity check on this index to detect obvious problems
+ *
+ * @throws IllegalArgumentException if any problems are found
+ */
+ def sanityCheck(): Unit
+
+ /**
+ * Remove all the entries from the index.
+ */
+ def truncate(): Unit
+
+ /**
+ * Remove all entries from the index which have an offset greater than or equal to the given offset.
+ * Truncating to an offset larger than the largest in the index has no effect.
+ */
+ def truncateTo(offset: Long): Unit
+
+ /**
+ * Forcefully free the buffer's mmap. We do this only on windows.
+ */
+ protected def forceUnmap(m: MappedByteBuffer) {
+ try {
+ m match {
+ case buffer: DirectBuffer =>
+ val bufferCleaner = buffer.cleaner()
+ /* cleaner can be null if the mapped region has size 0 */
+ if (bufferCleaner != null)
+ bufferCleaner.clean()
+ case _ =>
+ }
+ } catch {
+ case t: Throwable => error("Error when freeing index buffer", t)
+ }
+ }
+
+ /**
+ * Execute the given function in a lock only if we are running on windows. We do this
+ * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
+ * and this requires synchronizing reads.
+ */
+ protected def maybeLock[T](lock: Lock)(fun: => T): T = {
+ if(Os.isWindows)
+ lock.lock()
+ try {
+ fun
+ } finally {
+ if(Os.isWindows)
+ lock.unlock()
+ }
+ }
+
+ /**
+ * To parse an entry in the index.
+ *
+ * @param buffer the buffer of this memory mapped index.
+ * @param n the slot
+ * @return the index entry stored in the given slot.
+ */
+ protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
+
+ /**
+ * Find the slot in which the largest entry less than or equal to the given target key or value is stored.
+ * The comparison is made using the `IndexEntry.compareTo()` method.
+ *
+ * @param idx The index buffer
+ * @param target The index key to look for
+ * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
+ */
+ protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
+ // check if the index is empty
+ if(_entries == 0)
+ return -1
+
+ // check if the target offset is smaller than the least offset
+ if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
+ return -1
+
+ // binary search for the entry
+ var lo = 0
+ var hi = _entries - 1
+ while(lo < hi) {
+ val mid = ceil(hi/2.0 + lo/2.0).toInt
+ val found = parseEntry(idx, mid)
+ val compareResult = compareIndexEntry(found, target, searchEntity)
+ if(compareResult > 0)
+ hi = mid - 1
+ else if(compareResult < 0)
+ lo = mid
+ else
+ return mid
+ }
+ lo
+ }
+
+ private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
+ searchEntity match {
+ case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target)
+ case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target)
+ }
+ }
+
+ /**
+ * Round a number to the greatest exact multiple of the given factor less than the given number.
+ * E.g. roundDownToExactMultiple(67, 8) == 64
+ */
+ private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+
+}
+
+object IndexSearchType extends Enumeration {
+ type IndexSearchEntity = Value
+ val KEY, VALUE = Value
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 8e92f95..5763042 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
new FileMessageSet(file,
channel,
start = this.start + position,
- end = math.min(this.start + position + size, sizeInBytes()))
+ end = {
+ // Handle the integer overflow
+ if (this.start + position + size < 0)
+ sizeInBytes()
+ else
+ math.min(this.start + position + size, sizeInBytes())
+ })
}
/**
@@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
- def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+ def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()
@@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
channel.read(buffer, position)
if(buffer.hasRemaining)
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
- .format(targetOffset, startingPosition, file.getAbsolutePath))
+ .format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)
@@ -149,6 +155,72 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
/**
+ * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+ *
+ * The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid
+ * scanning the entire log when all the messages are still in old format.
+ *
+ * @param targetTimestamp The timestamp to search for.
+ * @param startingPosition The starting position to search.
+ * @return None, if no message exists at or after the starting position.
+ * Some(the_next_offset_to_read) otherwise.
+ */
+ def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = {
+ var maxTimestampChecked = Message.NoTimestamp
+ var lastOffsetChecked = -1L
+ val messagesToSearch = read(startingPosition, sizeInBytes)
+ for (messageAndOffset <- messagesToSearch) {
+ val message = messageAndOffset.message
+ lastOffsetChecked = messageAndOffset.offset
+ // Stop searching once we see message format before 0.10.0.
+ // This equivalent as treating message without timestamp has the largest timestamp.
+ // We do this to avoid scanning the entire log if no message has a timestamp.
+ if (message.magic == Message.MagicValue_V0)
+ return Some(messageAndOffset.offset)
+ else if (message.timestamp >= targetTimestamp) {
+ // We found a message
+ message.compressionCodec match {
+ case NoCompressionCodec =>
+ return Some(messageAndOffset.offset)
+ case _ =>
+ // Iterate over the inner messages to get the exact offset.
+ for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
+ val timestamp = innerMessage.message.timestamp
+ if (timestamp >= targetTimestamp)
+ return Some(innerMessage.offset)
+ }
+ throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
+ s" should contain target timestamp $targetTimestamp but it does not.")
+ }
+ } else
+ maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp)
+ }
+
+ if (lastOffsetChecked >= 0)
+ Some(lastOffsetChecked + 1)
+ else
+ None
+ }
+
+ /**
+ * Return the largest timestamp of the messages after a given position in this file message set.
+ * @param startingPosition The starting position.
+ * @return The largest timestamp of the messages after the given position.
+ */
+ def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
+ var maxTimestamp = Message.NoTimestamp
+ var offsetOfMaxTimestamp = -1L
+ val messagesToSearch = read(startingPosition, Int.MaxValue)
+ for (messageAndOffset <- messagesToSearch) {
+ if (messageAndOffset.message.timestamp > maxTimestamp) {
+ maxTimestamp = messageAndOffset.message.timestamp
+ offsetOfMaxTimestamp = messageAndOffset.offset
+ }
+ }
+ TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
+ }
+
+ /**
* Write some of this set to the given channel.
* @param destChannel The channel to write to.
* @param writePosition The position in the message set to begin writing from.
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/IndexEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala
new file mode 100644
index 0000000..2f5a6a7
--- /dev/null
+++ b/core/src/main/scala/kafka/log/IndexEntry.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+sealed trait IndexEntry {
+ // We always use Long for both key and value to avoid boxing.
+ def indexKey: Long
+ def indexValue: Long
+}
+
+/**
+ * The mapping between a logical log offset and the physical position
+ * in some log file of the beginning of the message set entry with the
+ * given offset.
+ */
+case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
+ override def indexKey = offset
+ override def indexValue = position.toLong
+}
+
+
+/**
+ * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater
+ * than that timestamp must be at or after that offset.
+ * @param timestamp The max timestamp before the given offset.
+ * @param offset The message offset.
+ */
+case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
+ override def indexKey = timestamp
+ override def indexValue = offset
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 1a7719a..b4aa470 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,6 +17,7 @@
package kafka.log
+import kafka.api.OffsetRequest
import kafka.utils._
import kafka.message._
import kafka.common._
@@ -30,19 +31,22 @@ import java.text.NumberFormat
import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
import org.apache.kafka.common.record.TimestampType
-import scala.collection.JavaConversions
+import scala.collection.{Seq, JavaConversions}
import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.utils.Utils
object LogAppendInfo {
- val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+ val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
}
/**
* Struct to hold various quantities we compute about each message set before appending to the log
+ *
* @param firstOffset The first offset in the message set
* @param lastOffset The last offset in the message set
- * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
+ * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param sourceCodec The source codec used in the message set (send by the producer)
* @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
* @param shallowCount The number of shallow messages
@@ -51,7 +55,9 @@ object LogAppendInfo {
*/
case class LogAppendInfo(var firstOffset: Long,
var lastOffset: Long,
- var timestamp: Long,
+ var maxTimestamp: Long,
+ var offsetOfMaxTimestamp: Long,
+ var logAppendTime: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
shallowCount: Int,
@@ -95,7 +101,7 @@ class Log(val dir: File,
else
0
}
-
+ val t = time.milliseconds
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
@@ -105,7 +111,8 @@ class Log(val dir: File,
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
- info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+ info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
+ .format(name, segments.size(), logEndOffset, time.milliseconds - t))
val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
@@ -167,12 +174,17 @@ class Log(val dir: File,
}
}
- // now do a second pass and load all the .log and .index files
+ // now do a second pass and load all the .log and all index files
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
- if(filename.endsWith(IndexFileSuffix)) {
+ if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
// if it is an index file, make sure it has a corresponding .log file
- val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+ val logFile =
+ if (filename.endsWith(TimeIndexFileSuffix))
+ new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
+ else
+ new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
file.delete()
@@ -181,6 +193,9 @@ class Log(val dir: File,
// if its a log file, load the corresponding log segment
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val indexFile = Log.indexFilename(dir, start)
+ val timeIndexFile = Log.timeIndexFilename(dir, start)
+
+ val indexFileExists = indexFile.exists()
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
@@ -189,20 +204,23 @@ class Log(val dir: File,
time = time,
fileAlreadyExists = true)
- if(indexFile.exists()) {
+ if (indexFileExists) {
try {
- segment.index.sanityCheck()
+ segment.index.sanityCheck()
+ segment.timeIndex.sanityCheck()
} catch {
case e: java.lang.IllegalArgumentException =>
- warn("Found a corrupted index file, %s, deleting and rebuilding index. Error Message: %s".format(indexFile.getAbsolutePath, e.getMessage))
+ warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
+ s"${indexFile.getAbsolutePath} and rebuilding index...")
indexFile.delete()
+ timeIndexFile.delete()
segment.recover(config.maxMessageSize)
}
- }
- else {
+ } else {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
+
segments.put(start, segment)
}
}
@@ -216,8 +234,11 @@ class Log(val dir: File,
val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+ val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
+ val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
index = index,
+ timeIndex = timeIndex,
baseOffset = startOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
@@ -243,6 +264,7 @@ class Log(val dir: File,
recoverLog()
// reset the index size of the currently active log segment to allow more entries
activeSegment.index.resize(config.maxIndexSize)
+ activeSegment.timeIndex.resize(config.maxIndexSize)
}
}
@@ -298,8 +320,7 @@ class Log(val dir: File,
def close() {
debug("Closing log " + name)
lock synchronized {
- for(seg <- logSegments)
- seg.close()
+ logSegments.foreach(_.close())
}
}
@@ -311,9 +332,7 @@ class Log(val dir: File,
*
* @param messages The message set to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
- *
* @throws KafkaStorageException If the append fails due to an I/O error.
- *
* @return Information about the appended messages including the first and last offset.
*/
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
@@ -335,7 +354,7 @@ class Log(val dir: File,
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = offset.value
val now = time.milliseconds
- val (validatedMessages, messageSizesMaybeChanged) = try {
+ val validateAndOffsetAssignResult = try {
validMessages.validateMessagesAndAssignOffsets(offset,
now,
appendInfo.sourceCodec,
@@ -347,14 +366,16 @@ class Log(val dir: File,
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
- validMessages = validatedMessages
+ validMessages = validateAndOffsetAssignResult.validatedMessages
+ appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
+ appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
- appendInfo.timestamp = now
+ appendInfo.logAppendTime = now
// re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
// format conversion)
- if (messageSizesMaybeChanged) {
+ if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (messageAndOffset <- validMessages.shallowIterator) {
if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
@@ -383,7 +404,8 @@ class Log(val dir: File,
val segment = maybeRoll(validMessages.sizeInBytes)
// now append to the log
- segment.append(appendInfo.firstOffset, validMessages)
+ segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
+ offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
@@ -424,6 +446,8 @@ class Log(val dir: File,
var firstOffset, lastOffset = -1L
var sourceCodec: CompressionCodec = NoCompressionCodec
var monotonic = true
+ var maxTimestamp = Message.NoTimestamp
+ var offsetOfMaxTimestamp = -1L
for(messageAndOffset <- messages.shallowIterator) {
// update the first offset if on the first message
if(firstOffset < 0)
@@ -447,7 +471,10 @@ class Log(val dir: File,
// check the validity of the message by checking CRC
m.ensureValid()
-
+ if (m.timestamp > maxTimestamp) {
+ maxTimestamp = m.timestamp
+ offsetOfMaxTimestamp = lastOffset
+ }
shallowMessageCount += 1
validBytesCount += messageSize
@@ -459,11 +486,12 @@ class Log(val dir: File,
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
- LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+ LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}
/**
* Trim any invalid bytes from the end of this message set (if there are any)
+ *
* @param messages The message set to trim
* @param info The general information of the message set
* @return A trimmed message set. This may be the same as what was passed in or it may not.
@@ -544,6 +572,71 @@ class Log(val dir: File,
}
/**
+ * Get an offset based on the given timestamp
+ * The offset returned is the offset of the first message whose timestamp is greater than or equals to the
+ * given timestamp.
+ *
+ * If no such message is found, the log end offset is returned.
+ *
+ * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before
+ * , i.e. it only gives back the timestamp based on the last modification time of the log segments.
+ *
+ * @param timestamp The given timestamp for offset fetching.
+ * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
+ */
+ def fetchOffsetsByTimestamp(timestamp: Long): Long = {
+ debug(s"Searching offset for timestamp $timestamp")
+ val segsArray = logSegments.toArray
+ if (timestamp == OffsetRequest.EarliestTime)
+ return segsArray(0).baseOffset
+
+ // set the target timestamp to be Long.MaxValue if we need to find from the latest.
+ val targetTimestamp = timestamp match {
+ case OffsetRequest.LatestTime => Long.MaxValue
+ case _ => timestamp
+ }
+
+ var foundOffset: Long = -1L
+ // We have this while loop here to make sure we are returning the valid offsets to our best knowledge.
+ // This while loop is to handle the case where the log is truncated during the timestamp search and we did not
+ // find any message. In this case, we need to retry the search.
+ do {
+ val targetSeg = {
+ // Get all the segments whose largest timestamp is smaller than target timestamp
+ val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp)
+ // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
+ if (earlierSegs.length < segsArray.length)
+ segsArray(earlierSegs.length)
+ else
+ earlierSegs.last
+ }
+
+ // First cache the current log end offset
+ val leo = logEndOffset
+ foundOffset = {
+ // Use the cached log end offsets if
+ // 1. user is asking for latest messages, or,
+ // 2. we are searching on the active segment and the target timestamp is greater than the largestTimestamp
+ // after we cached the log end offset. (We have to use the cached log end offsets because it is possible that
+ // some messages with a larger timestamp are appended after we check the largest timestamp. Using log end offset
+ // after the timestamp check might skip those messages.)
+ if (targetTimestamp == Long.MaxValue
+ || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == activeSegment))
+ leo
+ else
+ // The findOffsetByTimestamp() method may return None when the log is truncated during the timestamp search.
+ // In that case we simply set the foundOffset to -1 so that we will search the timestamp again in the
+ // while loop.
+ targetSeg.findOffsetByTimestamp(targetTimestamp) match {
+ case Some(offset) => offset
+ case None => -1L
+ }
+ }
+ } while (foundOffset < 0)
+ foundOffset
+ }
+
+ /**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, return unknown offset metadata
*/
@@ -559,6 +652,7 @@ class Log(val dir: File,
/**
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.
+ *
* @param predicate A function that takes in a single log segment and returns true iff it is deletable
* @return The number of segments deleted
*/
@@ -609,24 +703,22 @@ class Log(val dir: File,
* logSegment will be rolled if one of the following conditions met
* <ol>
* <li> The logSegment is full
- * <li> The maxTime has elapsed
+ * <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
+ * the first message does not have a timestamp)
* <li> The index is full
* </ol>
* @return The currently active segment after (perhaps) rolling to a new segment
*/
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
+ val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs
if (segment.size > config.segmentSize - messagesSize ||
- segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
- segment.index.isFull) {
- debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
- .format(name,
- segment.size,
- config.segmentSize,
- segment.index.entries,
- segment.index.maxEntries,
- time.milliseconds - segment.created,
- config.segmentMs - segment.rollJitterMs))
+ (segment.size > 0 && reachedRollMs) ||
+ segment.index.isFull || segment.timeIndex.isFull) {
+ debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
+ s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
+ s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
+ s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).")
roll()
} else {
segment
@@ -636,6 +728,7 @@ class Log(val dir: File,
/**
* Roll the log over to a new active segment starting with the current logEndOffset.
* This will trim the index to the exact size of the number of entries it currently contains.
+ *
* @return The newly rolled segment
*/
def roll(): LogSegment = {
@@ -644,7 +737,8 @@ class Log(val dir: File,
val newOffset = logEndOffset
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
- for(file <- List(logFile, indexFile); if file.exists) {
+ val timeIndexFile = timeIndexFilename(dir, newOffset)
+ for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
@@ -652,8 +746,11 @@ class Log(val dir: File,
segments.lastEntry() match {
case null =>
case entry => {
- entry.getValue.index.trimToValidSize()
- entry.getValue.log.trim()
+ val seg = entry.getValue
+ seg.onBecomeInactiveSegment()
+ seg.index.trimToValidSize()
+ seg.timeIndex.trimToValidSize()
+ seg.log.trim()
}
}
val segment = new LogSegment(dir,
@@ -692,6 +789,7 @@ class Log(val dir: File,
/**
* Flush log segments for all offsets up to offset-1
+ *
* @param offset The offset to flush up to (non-inclusive); the new recovery point
*/
def flush(offset: Long) : Unit = {
@@ -723,6 +821,7 @@ class Log(val dir: File,
/**
* Truncate this log so that it ends with the greatest offset < targetOffset.
+ *
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
*/
private[log] def truncateTo(targetOffset: Long) {
@@ -748,6 +847,7 @@ class Log(val dir: File,
/**
* Delete all data in the log and start at the new offset
+ *
* @param newOffset The new offset to start the log with
*/
private[log] def truncateFullyAndStartAt(newOffset: Long) {
@@ -826,6 +926,7 @@ class Log(val dir: File,
/**
* Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+ *
* @throws KafkaStorageException if the file can't be renamed and still exists
*/
private def asyncDeleteSegment(segment: LogSegment) {
@@ -893,6 +994,7 @@ class Log(val dir: File,
}
/**
* Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
+ *
* @param segment The segment to add
*/
def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
@@ -910,6 +1012,9 @@ object Log {
/** an index file */
val IndexFileSuffix = ".index"
+ /** a time index file */
+ val TimeIndexFileSuffix = ".timeindex"
+
/** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
@@ -920,13 +1025,14 @@ object Log {
val SwapFileSuffix = ".swap"
/** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
- * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
+ * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
/** TODO: Get rid of CleanShutdownFile in 0.8.2 */
val CleanShutdownFile = ".kafka_cleanshutdown"
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
+ *
* @param offset The offset to use in the file name
* @return The filename
*/
@@ -940,6 +1046,7 @@ object Log {
/**
* Construct a log file name in the given dir with the given base offset
+ *
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
@@ -948,12 +1055,21 @@ object Log {
/**
* Construct an index file name in the given dir using the given base offset
+ *
* @param dir The directory in which the log will reside
* @param offset The base offset of the log file
*/
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
+ /**
+ * Construct a time index file name in the given dir using the given base offset
+ *
+ * @param dir The directory in which the log will reside
+ * @param offset The base offset of the log file
+ */
+ def timeIndexFilename(dir: File, offset: Long) =
+ new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
/**
* Parse the topic and partition out of the directory name of a log
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 25c36e7..d4bb1f2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -334,7 +334,7 @@ private[log] class Cleaner(val id: Int,
val deleteHorizonMs =
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
- case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
+ case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
}
// group the segments and clean the groups
@@ -366,23 +366,32 @@ private[log] class Cleaner(val id: Int,
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
logFile.delete()
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
+ val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
+ timeIndexFile.delete()
val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
- val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+ val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
+ val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
try {
// clean segments into the new destination segment
for (old <- segments) {
- val retainDeletes = old.lastModified > deleteHorizonMs
- info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
- .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+ val retainDeletes = old.largestTimestamp > deleteHorizonMs
+ info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
+ .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
}
// trim excess index
index.trimToValidSize()
+ // Append the last index entry
+ cleaned.onBecomeInactiveSegment()
+
+ // trim time index
+ timeIndex.trimToValidSize()
+
// flush new segment to disk before swap
cleaned.flush()
@@ -422,6 +431,8 @@ private[log] class Cleaner(val id: Int,
// read a chunk of messages and copy any that are to be retained to the write buffer to be written out
readBuffer.clear()
writeBuffer.clear()
+ var maxTimestamp = Message.NoTimestamp
+ var offsetOfMaxTimestamp = -1L
val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
throttler.maybeThrottle(messages.sizeInBytes)
// check each message to see if it is to be retained
@@ -433,6 +444,10 @@ private[log] class Cleaner(val id: Int,
if (shouldRetainMessage(source, map, retainDeletes, entry)) {
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
stats.recopyMessage(size)
+ if (entry.message.timestamp > maxTimestamp) {
+ maxTimestamp = entry.message.timestamp
+ offsetOfMaxTimestamp = entry.offset
+ }
}
messagesRead += 1
} else {
@@ -443,12 +458,16 @@ private[log] class Cleaner(val id: Int,
val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
messages.foreach { messageAndOffset =>
messagesRead += 1
- if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset))
+ if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) {
retainedMessages += messageAndOffset
+ // We need the max timestamp and last offset for time index
+ if (messageAndOffset.message.timestamp > maxTimestamp)
+ maxTimestamp = messageAndOffset.message.timestamp
+ }
else writeOriginalMessageSet = false
}
-
- // There are no messages compacted out, write the original message set back
+ offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
+ // There are no messages compacted out and no message format conversion, write the original message set back
if (writeOriginalMessageSet)
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
else
@@ -461,7 +480,8 @@ private[log] class Cleaner(val id: Int,
if (writeBuffer.position > 0) {
writeBuffer.flip()
val retained = new ByteBufferMessageSet(writeBuffer)
- dest.append(retained.head.offset, retained)
+ dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp,
+ offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained)
throttler.maybeThrottle(writeBuffer.limit)
}
@@ -569,14 +589,17 @@ private[log] class Cleaner(val id: Int,
var group = List(segs.head)
var logSize = segs.head.size
var indexSize = segs.head.index.sizeInBytes
+ var timeIndexSize = segs.head.timeIndex.sizeInBytes
segs = segs.tail
while(segs.nonEmpty &&
logSize + segs.head.size <= maxSize &&
indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
+ timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes
+ timeIndexSize += segs.head.timeIndex.sizeInBytes
segs = segs.tail
}
grouped ::= group.reverse
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4357ef4..e6c60b9 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -108,7 +108,7 @@ class LogManager(val logDirs: Array[File],
*/
private def loadLogs(): Unit = {
info("Loading logs.")
-
+ val startMs = time.milliseconds
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
@@ -177,7 +177,7 @@ class LogManager(val logDirs: Array[File],
threadPools.foreach(_.shutdown())
}
- info("Logs loading complete.")
+ info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
}
/**
@@ -423,7 +423,7 @@ class LogManager(val logDirs: Array[File],
if (log.config.retentionMs < 0)
return 0
val startMs = time.milliseconds
- log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
+ log.deleteOldSegments(startMs - _.largestTimestamp > log.config.retentionMs)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6bbc50c..d894020 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -36,6 +36,7 @@ import java.io.{IOException, File}
*
* @param log The message set containing log entries
* @param index The offset index
+ * @param timeIndex The timestamp index
* @param baseOffset A lower bound on the offsets in this segment
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param time The time instance
@@ -43,6 +44,7 @@ import java.io.{IOException, File}
@nonthreadsafe
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
+ val timeIndex: TimeIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
@@ -53,9 +55,17 @@ class LogSegment(val log: FileMessageSet,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
+ /* The timestamp we used for time based log rolling */
+ private var rollingBasedTimestamp: Option[Long] = None
+
+ /* The maximum timestamp we see so far */
+ @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
+ @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
+
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+ new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
rollJitterMs,
@@ -70,21 +80,33 @@ class LogSegment(val log: FileMessageSet,
*
* It is assumed this method is being called from within a lock.
*
- * @param offset The first offset in the message set.
+ * @param firstOffset The first offset in the message set.
+ * @param largestTimestamp The largest timestamp in the message set.
+ * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append.
* @param messages The messages to append.
*/
@nonthreadsafe
- def append(offset: Long, messages: ByteBufferMessageSet) {
+ def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
- trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
+ trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d"
+ .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp))
+ val physicalPosition = log.sizeInBytes()
+ if (physicalPosition == 0)
+ rollingBasedTimestamp = Some(largestTimestamp)
+ // append the messages
+ log.append(messages)
+ // Update the in memory max timestamp and corresponding offset.
+ if (largestTimestamp > maxTimestampSoFar) {
+ maxTimestampSoFar = largestTimestamp
+ offsetOfMaxTimestamp = offsetOfLargestTimestamp
+ }
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
- index.append(offset, log.sizeInBytes())
- this.bytesSinceLastIndexEntry = 0
+ index.append(firstOffset, physicalPosition)
+ timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
+ bytesSinceLastIndexEntry = 0
}
- // append the messages
- log.append(messages)
- this.bytesSinceLastIndexEntry += messages.sizeInBytes
+ bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
@@ -97,13 +119,12 @@ class LogSegment(val log: FileMessageSet,
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
- *
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset)
- log.searchFor(offset, max(mapping.position, startingFilePosition))
+ log.searchForOffset(offset, max(mapping.position, startingFilePosition))
}
/**
@@ -165,30 +186,34 @@ class LogSegment(val log: FileMessageSet,
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
- *
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
+ timeIndex.truncate()
+ timeIndex.resize(timeIndex.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
+ maxTimestampSoFar = Message.NoTimestamp
try {
while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
+
+ // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
+ if (entry.message.timestamp > maxTimestampSoFar) {
+ maxTimestampSoFar = entry.message.timestamp
+ offsetOfMaxTimestamp = entry.offset
+ }
+
+ // Build offset index
if(validBytes - lastIndexEntry > indexIntervalBytes) {
- // we need to decompress the message, if required, to get the offset of the first uncompressed message
- val startOffset =
- entry.message.compressionCodec match {
- case NoCompressionCodec =>
- entry.offset
- case _ =>
- ByteBufferMessageSet.deepIterator(entry).next().offset
- }
+ val startOffset = entry.firstOffset
index.append(startOffset, validBytes)
+ timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
@@ -200,14 +225,35 @@ class LogSegment(val log: FileMessageSet,
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
+ // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
+ timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+ timeIndex.trimToValidSize()
truncated
}
+ def loadLargestTimestamp(readToLogEnd: Boolean = false) {
+ // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
+ val lastTimeIndexEntry = timeIndex.lastEntry
+ maxTimestampSoFar = lastTimeIndexEntry.timestamp
+ offsetOfMaxTimestamp = lastTimeIndexEntry.offset
+ if (readToLogEnd) {
+ val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+ // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
+ val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
+ if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
+ maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
+ offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
+ }
+ }
+ }
+
+
override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
/**
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
+ *
* @param offset The offset to truncate to
* @return The number of log bytes truncated
*/
@@ -217,12 +263,19 @@ class LogSegment(val log: FileMessageSet,
if(mapping == null)
return 0
index.truncateTo(offset)
+ timeIndex.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
+ timeIndex.resize(timeIndex.maxIndexSize)
val bytesTruncated = log.truncateTo(mapping.position)
- if(log.sizeInBytes == 0)
+ if(log.sizeInBytes == 0) {
created = time.milliseconds
+ rollingBasedTimestamp = None
+ }
bytesSinceLastIndexEntry = 0
+ // We may need to reload the max timestamp after truncation.
+ if (maxTimestampSoFar >= 0)
+ loadLargestTimestamp(readToLogEnd = true)
bytesTruncated
}
@@ -251,6 +304,7 @@ class LogSegment(val log: FileMessageSet,
LogFlushStats.logFlushTimer.time {
log.flush()
index.flush()
+ timeIndex.flush()
}
}
@@ -270,27 +324,96 @@ class LogSegment(val log: FileMessageSet,
catch {
case e: IOException => throw kafkaStorageException("index", e)
}
+ try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
+ catch {
+ case e: IOException => throw kafkaStorageException("timeindex", e)
+ }
+ }
+
+ /**
+ * Append the largest time index entry to the time index when this log segment become inactive segment.
+ * This entry will be used to decide when to delete the segment.
+ */
+ def onBecomeInactiveSegment() {
+ timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+ }
+
+ /**
+ * The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp,
+ * the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message.
+ */
+ def timeWaitedForRoll(now: Long) : Long= {
+ // Load the timestamp of the first message into memory
+ if (!rollingBasedTimestamp.isDefined) {
+ val iter = log.iterator
+ if (iter.hasNext)
+ rollingBasedTimestamp = Some(iter.next.message.timestamp)
+ else
+ // If the log is empty, we return 0 as time waited.
+ return now - created
+ }
+ now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created}
+ }
+
+ /**
+ * Search the message offset based on timestamp.
+ * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
+ * greater than or equals to the target timestamp.
+ *
+ * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
+ * timestamp will be max timestamp in the segment.
+ *
+ * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
+ * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+ *
+ * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
+ * from the indexed position. This could happen if the log is truncated after we get the indexed position but
+ * before we scan the log from there. In this case we simply return None and the caller will need to check on
+ * the truncated log and maybe retry or even do the search on another log segment.
+ *
+ * @param timestamp The timestamp to search for.
+ * @return an offset which points to the first message whose timestamp is larger than or equals to the
+ * target timestamp.
+ * None maybe returned when the log is truncated.
+ */
+ def findOffsetByTimestamp(timestamp: Long): Option[Long] = {
+ if (log.end == log.start) {
+ // The log segment is empty, just return base offset with no timestamp.
+ Some(baseOffset)
+ } else {
+ // Get the index entry with a timestamp less than or equal to the target timestamp
+ val timestampOffset = timeIndex.lookup(timestamp)
+ val position = index.lookup(timestampOffset.offset).position
+ // Search the timestamp
+ log.searchForTimestamp(timestamp, position)
+ }
}
/**
* Close this log segment
*/
def close() {
+ timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
CoreUtils.swallow(index.close)
+ CoreUtils.swallow(timeIndex.close())
CoreUtils.swallow(log.close)
}
/**
* Delete this log segment from the filesystem.
+ *
* @throws KafkaStorageException if the delete fails.
*/
def delete() {
val deletedLog = log.delete()
val deletedIndex = index.delete()
+ val deletedTimeIndex = timeIndex.delete()
if(!deletedLog && log.file.exists)
throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
if(!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
+ if(!deletedTimeIndex && timeIndex.file.exists)
+ throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
}
/**
@@ -299,10 +422,16 @@ class LogSegment(val log: FileMessageSet,
def lastModified = log.file.lastModified
/**
+ * The largest timestamp this segment contains.
+ */
+ def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified
+
+ /**
* Change the last modified time for this log segment
*/
def lastModified_=(ms: Long) = {
log.file.setLastModified(ms)
index.file.setLastModified(ms)
+ timeIndex.file.setLastModified(ms)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 848fe3b..ad1b196 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -17,18 +17,11 @@
package kafka.log
-import org.apache.kafka.common.utils.Utils
+import java.io.File
+import java.nio.ByteBuffer
-import scala.math._
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.locks._
-
-import kafka.utils._
import kafka.utils.CoreUtils.inLock
import kafka.common.InvalidOffsetException
-import sun.nio.ch.DirectBuffer
/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -55,137 +48,58 @@ import sun.nio.ch.DirectBuffer
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
-class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
-
- private val lock = new ReentrantLock
-
- /* initialize the memory mapping for this index */
- @volatile
- private[this] var mmap: MappedByteBuffer = {
- val newlyCreated = _file.createNewFile()
- val raf = new RandomAccessFile(_file, "rw")
- try {
- /* pre-allocate the file if necessary */
- if (newlyCreated) {
- if (maxIndexSize < 8)
- throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
- raf.setLength(roundToExactMultiple(maxIndexSize, 8))
- }
-
- /* memory-map the file */
- val len = raf.length()
- val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
- /* set the position in the index for the next entry */
- if (newlyCreated)
- idx.position(0)
- else
- // if this is a pre-existing index, assume it is all valid and set position to last entry
- idx.position(roundToExactMultiple(idx.limit, 8))
- idx
- } finally {
- CoreUtils.swallow(raf.close())
- }
- }
+class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1)
+ extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize) {
- /* the number of eight-byte entries currently in the index */
- @volatile
- private[this] var _entries = mmap.position / 8
-
- /* The maximum number of eight-byte entries this index can hold */
- @volatile
- private[this] var _maxEntries = mmap.limit / 8
-
- @volatile
- private[this] var _lastOffset = readLastEntry.offset
+ override def entrySize = 8
+
+ /* the last offset in the index */
+ private[this] var _lastOffset = lastEntry.offset
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
- .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
-
- /** The maximum number of entries this index can hold */
- def maxEntries: Int = _maxEntries
-
- /** The last offset in the index */
- def lastOffset: Long = _lastOffset
-
- /** The index file */
- def file: File = _file
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
/**
* The last entry in the index
*/
- def readLastEntry(): OffsetPosition = {
+ private def lastEntry: OffsetPosition = {
inLock(lock) {
_entries match {
case 0 => OffsetPosition(baseOffset, 0)
- case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
+ case s => parseEntry(mmap, s - 1).asInstanceOf[OffsetPosition]
}
}
}
+ def lastOffset: Long = _lastOffset
+
/**
* Find the largest offset less than or equal to the given targetOffset
* and return a pair holding this offset and its corresponding physical file position.
*
* @param targetOffset The offset to look up.
- *
- * @return The offset found and the corresponding file position for this offset.
- * If the target offset is smaller than the least entry in the index (or the index is empty),
- * the pair (baseOffset, 0) is returned.
+ * @return The offset found and the corresponding file position for this offset
+ * If the target offset is smaller than the least entry in the index (or the index is empty),
+ * the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate
- val slot = indexSlotFor(idx, targetOffset)
+ val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
- OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
- }
- }
-
- /**
- * Find the slot in which the largest offset less than or equal to the given
- * target offset is stored.
- *
- * @param idx The index buffer
- * @param targetOffset The offset to look for
- *
- * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
- */
- private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
- // we only store the difference from the base offset so calculate that
- val relOffset = targetOffset - baseOffset
-
- // check if the index is empty
- if (_entries == 0)
- return -1
-
- // check if the target offset is smaller than the least offset
- if (relativeOffset(idx, 0) > relOffset)
- return -1
-
- // binary search for the entry
- var lo = 0
- var hi = _entries - 1
- while (lo < hi) {
- val mid = ceil(hi/2.0 + lo/2.0).toInt
- val found = relativeOffset(idx, mid)
- if (found == relOffset)
- return mid
- else if (found < relOffset)
- lo = mid
- else
- hi = mid - 1
+ parseEntry(idx, slot).asInstanceOf[OffsetPosition]
}
- lo
}
-
- /* return the nth offset relative to the base offset */
- private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
-
- /* return the nth physical position */
- private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
+
+ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
+
+ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
+
+ override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+ OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
+ }
/**
* Get the nth offset mapping from the index
@@ -208,37 +122,25 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
inLock(lock) {
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
if (_entries == 0 || offset > _lastOffset) {
- debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+ debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
mmap.putInt((offset - baseOffset).toInt)
mmap.putInt(position)
_entries += 1
_lastOffset = offset
- require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+ require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
- .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
+ .format(offset, entries, _lastOffset, file.getAbsolutePath))
}
}
}
-
- /**
- * True iff there are no more slots available in this index
- */
- def isFull: Boolean = _entries >= _maxEntries
-
- /**
- * Truncate the entire index, deleting all entries
- */
- def truncate() = truncateToEntries(0)
-
- /**
- * Remove all entries from the index which have an offset greater than or equal to the given offset.
- * Truncating to an offset larger than the largest in the index has no effect.
- */
- def truncateTo(offset: Long) {
+
+ override def truncate() = truncateToEntries(0)
+
+ override def truncateTo(offset: Long) {
inLock(lock) {
val idx = mmap.duplicate
- val slot = indexSlotFor(idx, offset)
+ val slot = indexSlotFor(idx, offset, IndexSearchType.KEY)
/* There are 3 cases for choosing the new size
* 1) if there is no entry in the index <= the offset, delete everything
@@ -262,139 +164,19 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
private def truncateToEntries(entries: Int) {
inLock(lock) {
_entries = entries
- mmap.position(_entries * 8)
- _lastOffset = readLastEntry.offset
- }
- }
-
- /**
- * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
- * the file.
- */
- def trimToValidSize() {
- inLock(lock) {
- resize(_entries * 8)
+ mmap.position(_entries * entrySize)
+ _lastOffset = lastEntry.offset
}
}
- /**
- * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
- * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
- * loading segments from disk or truncating back to an old segment where a new log segment became active;
- * we want to reset the index size to maximum index size to avoid rolling new segment.
- */
- def resize(newSize: Int) {
- inLock(lock) {
- val raf = new RandomAccessFile(_file, "rw")
- val roundedNewSize = roundToExactMultiple(newSize, 8)
- val position = mmap.position
-
- /* Windows won't let us modify the file length while the file is mmapped :-( */
- if (Os.isWindows)
- forceUnmap(mmap)
- try {
- raf.setLength(roundedNewSize)
- mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
- _maxEntries = mmap.limit / 8
- mmap.position(position)
- } finally {
- CoreUtils.swallow(raf.close())
- }
- }
- }
-
- /**
- * Forcefully free the buffer's mmap. We do this only on windows.
- */
- private def forceUnmap(m: MappedByteBuffer) {
- try {
- m match {
- case buffer: DirectBuffer =>
- val bufferCleaner = buffer.cleaner()
- /* cleaner can be null if the mapped region has size 0 */
- if (bufferCleaner != null)
- bufferCleaner.clean()
- case _ =>
- }
- } catch {
- case t: Throwable => warn("Error when freeing index buffer", t)
- }
- }
-
- /**
- * Flush the data in the index to disk
- */
- def flush() {
- inLock(lock) {
- mmap.force()
- }
- }
-
- /**
- * Delete this index file
- */
- def delete(): Boolean = {
- info("Deleting index " + _file.getAbsolutePath)
- if (Os.isWindows)
- CoreUtils.swallow(forceUnmap(mmap))
- _file.delete()
- }
-
- /** The number of entries in this index */
- def entries = _entries
-
- /**
- * The number of bytes actually used by this index
- */
- def sizeInBytes() = 8 * _entries
-
- /** Close the index */
- def close() {
- trimToValidSize()
- }
-
- /**
- * Rename the file that backs this offset index
- * @throws IOException if rename fails
- */
- def renameTo(f: File) {
- try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
- finally _file = f
- }
-
- /**
- * Do a basic sanity check on this index to detect obvious problems
- * @throws IllegalArgumentException if any problems are found
- */
- def sanityCheck() {
- require(_entries == 0 || lastOffset > baseOffset,
- "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
- .format(_file.getAbsolutePath, lastOffset, baseOffset))
- val len = _file.length()
- require(len % 8 == 0,
- "Index file " + _file.getName + " is corrupt, found " + len +
+ override def sanityCheck() {
+ require(_entries == 0 || _lastOffset > baseOffset,
+ s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+ s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
+ val len = file.length()
+ require(len % entrySize == 0,
+ "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
" bytes which is not positive or not a multiple of 8.")
}
-
- /**
- * Round a number to the greatest exact multiple of the given factor less than the given number.
- * E.g. roundToExactMultiple(67, 8) == 64
- */
- private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
-
- /**
- * Execute the given function in a lock only if we are running on windows. We do this
- * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
- * and this requires synchronizing reads.
- */
- private def maybeLock[T](lock: Lock)(fun: => T): T = {
- if(Os.isWindows)
- lock.lock()
- try {
- fun
- } finally {
- if(Os.isWindows)
- lock.unlock()
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetPosition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala
deleted file mode 100644
index 24b6dcf..0000000
--- a/core/src/main/scala/kafka/log/OffsetPosition.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-/**
- * The mapping between a logical log offset and the physical position
- * in some log file of the beginning of the message set entry with the
- * given offset.
- */
-case class OffsetPosition(offset: Long, position: Int)
http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
new file mode 100644
index 0000000..7f24081
--- /dev/null
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import kafka.common.InvalidOffsetException
+import kafka.message.Message
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+
+/**
+ * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
+ * sparse, i.e. it may not hold an entry for all the messages in the segment.
+ *
+ * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries.
+ * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative"
+ * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen
+ * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET.
+ *
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
+ * storage format.
+ *
+ * The timestamps in the same time index file are guaranteed to be monotonically increasing.
+ *
+ * The index support timestamp lookup for a memory map of this file. The lookup is done using a binary search to find
+ * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp.
+ *
+ * Time index files can be opened in two ways: either as an empty, mutable index that allows appends or
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
+ * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
+ *
+ * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
+ *
+ */
+class TimeIndex(file: File,
+ baseOffset: Long,
+ maxIndexSize: Int = -1)
+ extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging {
+
+ override def entrySize = 12
+
+ // We override the full check to reserve the last time index entry slot for the on roll call.
+ override def isFull: Boolean = entries >= maxEntries - 1
+
+ private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize)
+
+ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
+
+ /**
+ * The last entry in the index
+ */
+ def lastEntry: TimestampOffset = {
+ inLock(lock) {
+ _entries match {
+ case 0 => TimestampOffset(Message.NoTimestamp, baseOffset)
+ case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
+ }
+ }
+ }
+
+ /**
+ * Get the nth timestamp mapping from the time index
+ * @param n The entry number in the time index
+ * @return The timestamp/offset pair at that entry
+ */
+ def entry(n: Int): TimestampOffset = {
+ maybeLock(lock) {
+ if(n >= _entries)
+ throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries))
+ val idx = mmap.duplicate
+ TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
+ }
+ }
+
+ override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+ TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n))
+ }
+
+ /**
+ * Attempt to append a time index entry to the time index.
+ * The new entry is appended only if both the timestamp and offsets are greater than the last appended timestamp and
+ * the last appended offset.
+ *
+ * @param timestamp The timestamp of the new time index entry
+ * @param offset The offset of the new time index entry
+ * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment
+ * gets rolled or the segment is closed.
+ */
+ def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
+ inLock(lock) {
+ if (!skipFullCheck)
+ require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
+ // We do not throw exception when the offset equals to the offset of last entry. That means we are trying
+ // to insert the same time index entry as the last entry.
+ // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
+ // because that could happen in the following two scenarios:
+ // 1. An log segment is closed.
+ // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
+ if (_entries != 0 && offset < lastEntry.offset)
+ throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s."
+ .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+ if (_entries != 0 && timestamp < lastEntry.timestamp)
+ throw new IllegalStateException("Attempt to append an timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s."
+ .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath))
+ // We only append to the time index when the timestamp is greater than the last inserted timestamp.
+ // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
+ // index will be empty.
+ if (timestamp > lastEntry.timestamp) {
+ debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName))
+ mmap.putLong(timestamp)
+ mmap.putInt((offset - baseOffset).toInt)
+ _entries += 1
+ require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+ }
+ }
+ }
+
+ /**
+ * Find the time index entry whose timestamp is less than or equal to the given timestamp.
+ * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is
+ * returned.
+ *
+ * @param targetTimestamp The timestamp to look up.
+ * @return The time index entry found.
+ */
+ def lookup(targetTimestamp: Long): TimestampOffset = {
+ maybeLock(lock) {
+ val idx = mmap.duplicate
+ val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
+ if (slot == -1)
+ TimestampOffset(Message.NoTimestamp, baseOffset)
+ else {
+ val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
+ TimestampOffset(entry.timestamp, entry.offset)
+ }
+ }
+ }
+
+ override def truncate() = truncateToEntries(0)
+
+ /**
+ * Remove all entries from the index which have an offset greater than or equal to the given offset.
+ * Truncating to an offset larger than the largest in the index has no effect.
+ */
+ override def truncateTo(offset: Long) {
+ inLock(lock) {
+ val idx = mmap.duplicate
+ val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE)
+
+ /* There are 3 cases for choosing the new size
+ * 1) if there is no entry in the index <= the offset, delete everything
+ * 2) if there is an entry for this exact offset, delete it and everything larger than it
+ * 3) if there is no entry for this offset, delete everything larger than the next smallest
+ */
+ val newEntries =
+ if(slot < 0)
+ 0
+ else if(relativeOffset(idx, slot) == offset - baseOffset)
+ slot
+ else
+ slot + 1
+ truncateToEntries(newEntries)
+ }
+ }
+
+ /**
+ * Truncates index to a known number of entries.
+ */
+ private def truncateToEntries(entries: Int) {
+ inLock(lock) {
+ _entries = entries
+ mmap.position(_entries * entrySize)
+ }
+ }
+
+ override def sanityCheck() {
+ val entry = lastEntry
+ val lastTimestamp = entry.timestamp
+ val lastOffset = entry.offset
+ require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
+ s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
+ s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")
+ require(_entries == 0 || lastOffset >= baseOffset,
+ s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+ s"is $lastOffset which is smaller than the first offset $baseOffset")
+ val len = file.length()
+ require(len % entrySize == 0,
+ "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
+ " bytes which is not positive or not a multiple of 12.")
+ }
+}
\ No newline at end of file