You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC
[3/3] git commit: KAFKA-631 Implement a log cleaner for Kafka.
Reviewed by Neha.
Updated Branches:
refs/heads/trunk 92f177b30 -> e7edb5e1e
KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e7edb5e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e7edb5e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e7edb5e1
Branch: refs/heads/trunk
Commit: e7edb5e1e933f5535378d546bcf4d8b178d2e69c
Parents: 92f177b
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Jan 28 19:31:17 2013 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Mon Jan 28 19:31:17 2013 -0800
----------------------------------------------------------------------
config/log4j.properties | 15 +-
config/server.properties | 1 +
core/src/main/scala/kafka/cluster/Partition.scala | 8 +-
.../common/OptimisticLockFailureException.scala | 23 +
core/src/main/scala/kafka/log/CleanerConfig.scala | 41 ++
core/src/main/scala/kafka/log/FileMessageSet.scala | 30 +-
core/src/main/scala/kafka/log/Log.scala | 265 +++++---
core/src/main/scala/kafka/log/LogCleaner.scala | 557 +++++++++++++++
core/src/main/scala/kafka/log/LogConfig.scala | 51 ++
core/src/main/scala/kafka/log/LogManager.scala | 120 ++--
core/src/main/scala/kafka/log/LogSegment.scala | 24 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 33 +-
core/src/main/scala/kafka/log/OffsetMap.scala | 136 ++++
.../scala/kafka/message/ByteBufferMessageSet.scala | 4 +-
.../kafka/server/HighwaterMarkCheckpoint.scala | 118 ---
core/src/main/scala/kafka/server/KafkaConfig.scala | 33 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 51 ++-
.../main/scala/kafka/server/OffsetCheckpoint.scala | 103 +++
.../main/scala/kafka/server/ReplicaManager.scala | 14 +-
core/src/main/scala/kafka/utils/FileLock.scala | 94 ++--
core/src/main/scala/kafka/utils/Logging.scala | 5 +-
core/src/main/scala/kafka/utils/Throttler.scala | 26 +-
core/src/main/scala/kafka/utils/Utils.scala | 36 +-
.../scala/kafka/utils/VerifiableProperties.scala | 35 +-
.../src/test/scala/other/kafka/StressTestLog.scala | 12 +-
.../test/scala/other/kafka/TestLogCleaning.scala | 216 ++++++
.../scala/other/kafka/TestLogPerformance.scala | 3 +-
.../test/scala/unit/kafka/log/CleanerTest.scala | 227 ++++++
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 117 +++
.../test/scala/unit/kafka/log/LogManagerTest.scala | 57 +-
.../test/scala/unit/kafka/log/LogSegmentTest.scala | 15 +
core/src/test/scala/unit/kafka/log/LogTest.scala | 139 ++--
.../test/scala/unit/kafka/log/OffsetMapTest.scala | 87 +++
.../server/HighwatermarkPersistenceTest.scala | 16 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 2 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 48 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 2 -
.../scala/unit/kafka/utils/MockScheduler.scala | 32 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 11 +-
.../test/scala/unit/kafka/utils/UtilsTest.scala | 20 +
40 files changed, 2280 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index 00f891c..b36d3e0 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -36,6 +36,12 @@ log4j.appender.requestAppender.File=kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
# Turn on all our debugging info
#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
@@ -44,13 +50,18 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
log4j.logger.kafka=INFO
-log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
log4j.additivity.kafka.network.RequestChannel$=false
#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
#log4j.additivity.kafka.server.KafkaApis=false
-log4j.logger.kafka.request.logger=TRACE, requestAppender
+log4j.logger.kafka.request.logger=WARN, requestAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.controller=TRACE, stateChangeAppender
log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.Cleaner=false
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 04408dd..5a16caf 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -114,3 +114,4 @@ kafka.csv.metrics.dir=/tmp/kafka_metrics
# Disable csv reporting by default.
kafka.csv.metrics.reporter.enabled=false
+log.cleanup.policy=delete
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 71eb980..af80631 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,7 @@ import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
-import kafka.common.ErrorMapping
+import kafka.common._
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
@@ -75,11 +75,11 @@ class Partition(val topic: String,
case None =>
if (isReplicaLocal(replicaId)) {
val log = logManager.getOrCreateLog(topic, partitionId)
- val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset)
+ val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
+ val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
- }
- else {
+ } else {
val remoteReplica = new Replica(replicaId, this, time)
addReplicaIfNotExists(remoteReplica)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
new file mode 100644
index 0000000..0e69110
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when an optimistic locking attempt receives concurrent modifications
+ */
+class OptimisticLockFailureException(message: String) extends RuntimeException(message)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
new file mode 100644
index 0000000..999fee8
--- /dev/null
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+/**
+ * Configuration parameters for the log cleaner
+ *
+ * @param numThreads The number of cleaner threads to run
+ * @param dedupeBufferSize The total memory used for log deduplication
+ * @param dedupeBufferLoadFactor The maximum percent full for the deduplication buffer
+ * @param maxMessageSize The maximum size of a message that can appear in the log
+ * @param maxIoBytesPerSecond The maximum read and write I/O that all cleaner threads are allowed to do
+ * @param backOffMs The amount of time to wait before rechecking if no logs are eligible for cleaning
+ * @param enableCleaner Allows completely disabling the log cleaner
+ * @param hashAlgorithm The hash algorithm to use in key comparison.
+ */
+case class CleanerConfig(val numThreads: Int = 1,
+ val dedupeBufferSize: Int = 4*1024*1024,
+ val dedupeBufferLoadFactor: Double = 0.75,
+ val ioBufferSize: Int = 1024*1024,
+ val maxMessageSize: Int = 32*1024*1024,
+ val maxIoBytesPerSecond: Double = Double.MaxValue,
+ val backOffMs: Long = 60 * 1000,
+ val enableCleaner: Boolean = true,
+ val hashAlgorithm: String = "MD5") {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 37e8d87..a74abfe 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -38,7 +38,7 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
* @param isSlice Should the start and end parameters be used for slicing?
*/
@nonthreadsafe
-class FileMessageSet private[kafka](val file: File,
+class FileMessageSet private[kafka](@volatile var file: File,
private[log] val channel: FileChannel,
private[log] val start: Int,
private[log] val end: Int,
@@ -223,14 +223,36 @@ class FileMessageSet private[kafka](val file: File,
* Truncate this file message set to the given size in bytes. Note that this API does no checking that the
* given size falls on a valid message boundary.
* @param targetSize The size to truncate to.
+ * @return The number of bytes truncated off
*/
- def truncateTo(targetSize: Int) = {
- if(targetSize > sizeInBytes || targetSize < 0)
+ def truncateTo(targetSize: Int): Int = {
+ val originalSize = sizeInBytes
+ if(targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
- " size of this log segment is " + sizeInBytes + " bytes.")
+ " size of this log segment is " + originalSize + " bytes.")
channel.truncate(targetSize)
channel.position(targetSize)
_size.set(targetSize)
+ originalSize - targetSize
+ }
+
+ /**
+ * Read from the underlying file into the buffer starting at the given position
+ */
+ def readInto(buffer: ByteBuffer, position: Int): ByteBuffer = {
+ channel.read(buffer, position)
+ buffer.flip()
+ buffer
+ }
+
+ /**
+ * Rename the file that backs this message set
+ * @return true iff the rename was successful
+ */
+ def renameTo(f: File): Boolean = {
+ val success = this.file.renameTo(f)
+ this.file = f
+ success
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 5ea9489..ac12b74 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -49,15 +49,9 @@ import com.yammer.metrics.core.Gauge
*/
@threadsafe
class Log(val dir: File,
+ val config: LogConfig,
+ val needsRecovery: Boolean,
val scheduler: Scheduler,
- val maxSegmentSize: Int,
- val maxMessageSize: Int,
- val flushInterval: Int = Int.MaxValue,
- val rollIntervalMs: Long = Long.MaxValue,
- val needsRecovery: Boolean,
- val maxIndexSize: Int = (10*1024*1024),
- val indexIntervalBytes: Int = 4096,
- val segmentDeleteDelayMs: Long = 60000,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -73,6 +67,9 @@ class Log(val dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[Long,LogSegment] = loadSegments()
+
+ /* The number of times the log has been truncated */
+ private val truncates = new AtomicInteger(0)
/* Calculate the offset of the next message */
private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset())
@@ -90,58 +87,82 @@ class Log(val dir: File,
private def loadSegments(): ConcurrentNavigableMap[Long, LogSegment] = {
// open all the segments read-only
val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
- val ls = dir.listFiles()
- if(ls != null) {
- for(file <- ls if file.isFile) {
- if(!file.canRead)
- throw new IOException("Could not read file " + file)
- val filename = file.getName
- if(filename.endsWith(DeletedFileSuffix)) {
- // if the file ends in .deleted, delete it
- val deleted = file.delete()
- if(!deleted)
- warn("Attempt to delete defunct segment file %s failed.".format(filename))
- } else if(filename.endsWith(IndexFileSuffix)) {
- // if it is an index file, make sure it has a corresponding .log file
- val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
- if(!logFile.exists) {
- warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
- file.delete()
- }
- } else if(filename.endsWith(LogFileSuffix)) {
- // if its a log file, load the corresponding log segment
- val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
- val hasIndex = Log.indexFilename(dir, start).exists
- val segment = new LogSegment(dir = dir,
- startOffset = start,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize)
- if(!hasIndex) {
- // this can only happen if someone manually deletes the index file
- error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
- segment.recover(maxMessageSize)
- }
- logSegments.put(start, segment)
+
+ // create the log directory if it doesn't exist
+ dir.mkdirs()
+
+ // first do a pass through the files in the log directory and remove any temporary files
+ // and complete any interrupted swap operations
+ for(file <- dir.listFiles if file.isFile) {
+ if(!file.canRead)
+ throw new IOException("Could not read file " + file)
+ val filename = file.getName
+ if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
+ // if the file ends in .deleted or .cleaned, delete it
+ file.delete()
+ } else if(filename.endsWith(SwapFileSuffix)) {
+ // we crashed in the middle of a swap operation, to recover:
+ // if a log, swap it in and delete the .index file
+ // if an index just delete it, it will be rebuilt
+ val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
+ if(baseName.getPath.endsWith(IndexFileSuffix)) {
+ file.delete()
+ } else if(baseName.getPath.endsWith(LogFileSuffix)){
+ // delete the index
+ val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
+ index.delete()
+ // complete the swap operation
+ val renamed = file.renameTo(baseName)
+ if(renamed)
+ info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
+ else
+ throw new KafkaException("Failed to rename file %s.".format(file.getPath))
}
}
}
+ // now do a second pass and load all the .log and .index files
+ for(file <- dir.listFiles if file.isFile) {
+ val filename = file.getName
+ if(filename.endsWith(IndexFileSuffix)) {
+ // if it is an index file, make sure it has a corresponding .log file
+ val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+ if(!logFile.exists) {
+ warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+ file.delete()
+ }
+ } else if(filename.endsWith(LogFileSuffix)) {
+ // if its a log file, load the corresponding log segment
+ val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+ val hasIndex = Log.indexFilename(dir, start).exists
+ val segment = new LogSegment(dir = dir,
+ startOffset = start,
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize)
+ if(!hasIndex) {
+ error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+ segment.recover(config.maxMessageSize)
+ }
+ logSegments.put(start, segment)
+ }
+ }
+
if(logSegments.size == 0) {
// no existing segments, create a new mutable segment beginning at offset 0
logSegments.put(0,
new LogSegment(dir = dir,
startOffset = 0,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize))
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize))
} else {
// reset the index size of the currently active log segment to allow more entries
val active = logSegments.lastEntry.getValue
- active.index.resize(maxIndexSize)
+ active.index.resize(config.maxIndexSize)
// run recovery on the active segment if necessary
if(needsRecovery) {
info("Recovering active segment of %s.".format(name))
- active.recover(maxMessageSize)
+ active.recover(config.maxMessageSize)
}
}
logSegments
@@ -152,6 +173,11 @@ class Log(val dir: File,
* Take care! this is an O(n) operation.
*/
def numberOfSegments: Int = segments.size
+
+ /**
+ * The number of truncates that have occurred since the log was opened.
+ */
+ def numberOfTruncates: Int = truncates.get
/**
* Close this log
@@ -194,22 +220,24 @@ class Log(val dir: File,
val segment = maybeRoll()
if(assignOffsets) {
- // assign offsets to the messageset
+ // assign offsets to the messageset
appendInfo.firstOffset = nextOffset.get
- val offsetCounter = new AtomicLong(nextOffset.get)
- validMessages = validMessages.assignOffsets(offsetCounter, appendInfo.codec)
- appendInfo.lastOffset = offsetCounter.get - 1
+ val offset = new AtomicLong(nextOffset.get)
+ validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
+ appendInfo.lastOffset = offset.get - 1
} else {
// we are taking the offsets we are given
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffset.get)
- throw new IllegalArgumentException("Out of order offsets found in " + messages)
+ throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
// now append to the log
trace("Appending message set to %s with offsets %d to %d.".format(name, appendInfo.firstOffset, appendInfo.lastOffset))
segment.append(appendInfo.firstOffset, validMessages)
+
+ // increment the log end offset
nextOffset.set(appendInfo.lastOffset + 1)
-
+
// maybe flush the log and index
maybeFlush(appendInfo.count)
@@ -263,8 +291,8 @@ class Log(val dir: File,
// check the validity of the message by checking CRC and message size
val m = messageAndOffset.message
m.ensureValid()
- if(MessageSet.entrySize(m) > maxMessageSize)
- throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
+ if(MessageSet.entrySize(m) > config.maxMessageSize)
+ throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), config.maxMessageSize))
messageCount += 1;
@@ -372,18 +400,21 @@ class Log(val dir: File,
*/
private def maybeRoll(): LogSegment = {
val segment = activeSegment
- if (segment.size > maxSegmentSize) {
- info("Rolling %s due to full data log".format(name))
- roll()
- } else if (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) {
- info("Rolling %s due to time based rolling".format(name))
- roll()
- } else if (segment.index.isFull) {
- info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
- .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+ if (segment.size > config.segmentSize ||
+ segment.size > 0 && time.milliseconds - segment.created > config.segmentMs ||
+ segment.index.isFull) {
+ debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
+ .format(name,
+ segment.size,
+ config.segmentSize,
+ segment.index.entries,
+ segment.index.maxEntries,
+ time.milliseconds - segment.created,
+ config.segmentMs))
roll()
- } else
+ } else {
segment
+ }
}
/**
@@ -412,11 +443,11 @@ class Log(val dir: File,
}
val segment = new LogSegment(dir,
startOffset = newOffset,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize)
- val prev = segments.put(segment.baseOffset, segment)
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize)
+ val prev = addSegment(segment)
if(prev != null)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
+ throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
segment
}
}
@@ -426,12 +457,12 @@ class Log(val dir: File,
* @param numberOfMessages The number of messages that are being appended
*/
private def maybeFlush(numberOfMessages : Int) {
- if(unflushed.addAndGet(numberOfMessages) >= flushInterval)
+ if(unflushed.addAndGet(numberOfMessages) >= config.flushInterval)
flush()
}
/**
- * Flush this log file and assoicated index to the physical disk
+ * Flush this log file and associated index to the physical disk
*/
def flush() : Unit = {
if (unflushed.get == 0)
@@ -475,6 +506,7 @@ class Log(val dir: File,
activeSegment.truncateTo(targetOffset)
this.nextOffset.set(targetOffset)
}
+ truncates.getAndIncrement
}
}
@@ -487,12 +519,12 @@ class Log(val dir: File,
lock synchronized {
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment(_))
- segments.put(newOffset,
- new LogSegment(dir,
- newOffset,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize))
+ addSegment(new LogSegment(dir,
+ newOffset,
+ indexIntervalBytes = config.indexInterval,
+ maxIndexSize = config.maxIndexSize))
this.nextOffset.set(newOffset)
+ truncates.getAndIncrement
}
}
@@ -511,7 +543,13 @@ class Log(val dir: File,
*/
def logSegments: Iterable[LogSegment] = asIterable(segments.values)
- override def toString() = "Log(" + this.dir + ")"
+ /**
+ * Get all segments beginning with the segment that includes "from" and ending with the segment
+ * that includes up to "to-1" or the end of the log (if to > logEndOffset)
+ */
+ def logSegments(from: Long, to: Long) = asIterable(segments.subMap(from, true, to, false).values)
+
+ override def toString() = "Log(" + dir + ")"
/**
* This method performs an asynchronous log segment delete by doing the following:
@@ -526,37 +564,82 @@ class Log(val dir: File,
* @param segment The log segment to schedule for deletion
*/
private def deleteSegment(segment: LogSegment) {
- info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, dir.getName))
+ info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
lock synchronized {
segments.remove(segment.baseOffset)
- val deletedLog = new File(segment.log.file.getPath + Log.DeletedFileSuffix)
- val deletedIndex = new File(segment.index.file.getPath + Log.DeletedFileSuffix)
- val renamedLog = segment.log.file.renameTo(deletedLog)
- val renamedIndex = segment.index.file.renameTo(deletedIndex)
- if(!renamedLog && segment.log.file.exists)
- throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.log.file.getPath, deletedLog.getPath, name))
- if(!renamedIndex && segment.index.file.exists)
- throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.index.file.getPath, deletedIndex.getPath, name))
- def asyncDeleteFiles() {
- info("Deleting log segment %s for log %s.".format(segment.baseOffset, name))
- if(!deletedLog.delete())
- warn("Failed to delete log segment file %s for log %s.".format(deletedLog.getPath, name))
- if(!deletedIndex.delete())
- warn("Failed to delete index segment file %s for log %s.".format(deletedLog.getPath, name))
- }
- scheduler.schedule("delete-log-segment", asyncDeleteFiles, delay = segmentDeleteDelayMs)
+ asyncDeleteSegment(segment)
+ }
+ }
+
+ /**
+ * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+ * @throws KafkaStorageException if the file can't be renamed and still exists
+ */
+ private def asyncDeleteSegment(segment: LogSegment) {
+ segment.changeFileSuffixes("", Log.DeletedFileSuffix)
+ def deleteSeg() {
+ info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
+ segment.delete()
}
+ scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
+ }
+
+ /**
+ * Swap a new segment in place and delete one or more existing segments in a crash-safe manner. The old segments will
+ * be asynchronously deleted.
+ *
+ * @param newSegment The new log segment to add to the log
+ * @param oldSegments The old log segments to delete from the log
+ */
+ private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], expectedTruncates: Int) {
+ lock synchronized {
+ if(expectedTruncates != numberOfTruncates)
+ throw new OptimisticLockFailureException("The log has been truncated, expected %d but found %d.".format(expectedTruncates, numberOfTruncates))
+ // need to do this in two phases to be crash safe AND do the delete asynchronously
+ // if we crash in the middle of this we complete the swap in loadSegments()
+ newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
+ addSegment(newSegment)
+
+ // delete the old files
+ for(seg <- oldSegments) {
+ // remove the index entry
+ if(seg.baseOffset != newSegment.baseOffset)
+ segments.remove(seg.baseOffset)
+ // delete segment
+ asyncDeleteSegment(seg)
+ }
+ // okay we are safe now, remove the swap suffix
+ newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
+ }
}
+ /**
+ * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
+ * @param segment The segment to add
+ */
+ def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
+
}
/**
* Helper functions for logs
*/
object Log {
+
+ /** a log file */
val LogFileSuffix = ".log"
+
+ /** an index file */
val IndexFileSuffix = ".index"
+
+ /** a file that is scheduled to be deleted */
val DeletedFileSuffix = ".deleted"
+
+ /** A temporary file that is being used for log cleaning */
+ val CleanedFileSuffix = ".cleaned"
+
+ /** A temporary file used when swapping files into the log */
+ val SwapFileSuffix = ".swap"
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
new file mode 100644
index 0000000..368a12b
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import scala.collection._
+import scala.math
+import java.nio._
+import java.util.concurrent.Semaphore
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic._
+import java.io.File
+import kafka.common._
+import kafka.message._
+import kafka.server.OffsetCheckpoint
+import kafka.utils._
+
+/**
+ * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
+ * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
+ *
+ * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
+ * "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.
+ *
+ * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy
+ * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
+ *
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping
+ * is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of
+ * the implementation of the mapping.
+ *
+ * Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a
+ * higher offset than what is found in the segment (i.e. messages with a key that appears in the dirty section of the log).
+ *
+ * To avoid segments shrinking to very small sizes with repeated cleanings we implement a rule by which if we will merge successive segments when
+ * doing a cleaning if their log and index size are less than the maximum log and index size prior to the clean beginning.
+ *
+ * Cleaned segments are swapped into the log as they become available.
+ *
+ * One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
+ *
+ * @param config Configuration parameters for the cleaner
+ * @param logDirs The directories where offset checkpoints reside
+ * @param logs The pool of logs
+ * @param time A way to control the passage of time
+ */
+class LogCleaner(val config: CleanerConfig,
+ val logDirs: Array[File],
+ val logs: Pool[TopicAndPartition, Log],
+ time: Time = SystemTime) extends Logging {
+
+ /* the offset checkpoints holding the last cleaned point for each log */
+ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
+
+ /* the set of logs currently being cleaned */
+ private val inProgress = mutable.HashSet[TopicAndPartition]()
+
+ /* a global lock used to control all access to the in-progress set and the offset checkpoints */
+ private val lock = new Object
+
+ /* a counter for creating unique thread names*/
+ private val threadId = new AtomicInteger(0)
+
+ /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
+ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
+ checkIntervalMs = 300,
+ throttleDown = true,
+ time = time)
+
+ /* the threads */
+ private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread())
+
+ /* a hook for testing to synchronize on log cleaning completions */
+ private val cleaned = new Semaphore(0)
+
+ /**
+ * Start the background cleaning
+ */
+ def startup() {
+ info("Starting the log cleaner")
+ cleaners.foreach(_.start())
+ }
+
+ /**
+ * Stop the background cleaning
+ */
+ def shutdown() {
+ info("Shutting down the log cleaner.")
+ cleaners.foreach(_.interrupt())
+ cleaners.foreach(_.join())
+ }
+
+ /**
+ * For testing, a way to know when work has completed. This method blocks until the
+ * cleaner has processed up to the given offset on the specified topic/partition
+ */
+ def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
+ while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
+ cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS)
+ }
+
+ /**
+ * @return the position processed for all logs.
+ */
+ def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+ checkpoints.values.flatMap(_.read()).toMap
+
+ /**
+ * Choose the log to clean next and add it to the in-progress set. We recompute this
+ * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+ * the log manager maintains.
+ */
+ private def grabFilthiestLog(): Option[LogToClean] = {
+ lock synchronized {
+ val lastClean = allCleanerCheckpoints()
+ val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe
+ .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress
+ .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each
+ val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes
+ .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio
+ if(dirtyLogs.isEmpty) {
+ None
+ } else {
+ val filthiest = dirtyLogs.max
+ inProgress += filthiest.topicPartition
+ Some(filthiest)
+ }
+ }
+ }
+
+ /**
+ * Save out the endOffset and remove the given log from the in-progress set.
+ */
+ private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
+ lock synchronized {
+ val checkpoint = checkpoints(dataDir)
+ val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
+ checkpoint.write(offsets)
+ inProgress -= topicAndPartition
+ }
+ cleaned.release()
+ }
+
+ /**
+ * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
+ * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
+ */
+ private class CleanerThread extends Thread {
+ val cleaner = new Cleaner(id = threadId.getAndIncrement(),
+ offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads,
+ maxLoadFactor = config.dedupeBufferLoadFactor,
+ hashAlgorithm = config.hashAlgorithm),
+ ioBufferSize = config.ioBufferSize / config.numThreads / 2,
+ maxIoBufferSize = config.maxMessageSize,
+ throttler = throttler,
+ time = time)
+
+ setName("kafka-log-cleaner-thread-" + cleaner.id)
+ setDaemon(false)
+
+ /**
+ * The main loop for the cleaner thread
+ */
+ override def run() {
+ info("Starting cleaner thread %d...".format(cleaner.id))
+ try {
+ while(!isInterrupted) {
+ cleanOrSleep()
+ }
+ } catch {
+ case e: InterruptedException => // all done
+ case e: Exception =>
+ error("Error in cleaner thread %d:".format(cleaner.id), e)
+ }
+ info("Shutting down cleaner thread %d.".format(cleaner.id))
+ }
+
+ /**
+ * Clean a log if there is a dirty log available, otherwise sleep for a bit
+ */
+ private def cleanOrSleep() {
+ grabFilthiestLog() match {
+ case None =>
+ // there are no cleanable logs, sleep a while
+ time.sleep(config.backOffMs)
+ case Some(cleanable) =>
+ // there's a log, clean it
+ var endOffset = cleanable.firstDirtyOffset
+ try {
+ endOffset = cleaner.clean(cleanable)
+ logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
+ } catch {
+ case e: OptimisticLockFailureException =>
+ info("Cleaning of log was aborted due to colliding truncate operation.")
+ } finally {
+ doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
+ }
+ }
+ }
+
+ /**
+ * Log out statistics on a single run of the cleaner.
+ */
+ def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
+ def mb(bytes: Double) = bytes / (1024*1024)
+ val message =
+ "%n\tLog cleaner %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
+ "\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
+ stats.elapsedSecs,
+ mb(stats.bytesRead/stats.elapsedSecs)) +
+ "\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
+ stats.elapsedIndexSecs,
+ mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
+ 100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) +
+ "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
+ stats.elapsedSecs - stats.elapsedIndexSecs,
+ mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) +
+ "\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
+ "\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) +
+ "\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
+ 100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
+ info(message)
+ }
+
+ }
+}
+
+/**
+ * This class holds the actual logic for cleaning a log
+ * @param id An identifier used for logging
+ * @param offsetMap The map used for deduplication
+ * @param bufferSize The size of the buffers to use. Memory usage will be 2x this number as there is a read and write buffer.
+ * @param throttler The throttler instance to use for limiting I/O rate.
+ * @param time The time instance
+ */
+private[log] class Cleaner(val id: Int,
+ offsetMap: OffsetMap,
+ ioBufferSize: Int,
+ maxIoBufferSize: Int,
+ throttler: Throttler,
+ time: Time) extends Logging {
+
+ this.logIdent = "Cleaner " + id + ":"
+ val stats = new CleanerStats(time)
+ private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O
+ private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O
+
+ /**
+ * Clean the given log
+ *
+ * @param cleanable The log to be cleaned
+ *
+ * @return The first offset not cleaned
+ */
+ private[log] def clean(cleanable: LogToClean): Long = {
+ stats.clear()
+ val topic = cleanable.topicPartition.topic
+ val part = cleanable.topicPartition.partition
+ info("Beginning cleaning of %s-%d.".format(topic, part))
+ val log = cleanable.log
+ val truncateCount = log.numberOfTruncates
+
+ // build the offset map
+ val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity)
+ val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
+ stats.indexDone()
+
+ // group the segments and clean the groups
+ for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
+ info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name))
+ cleanSegments(log, group, offsetMap, truncateCount)
+ }
+ stats.allDone()
+ endOffset
+ }
+
+ /**
+ * Clean a group of segments into a single replacement segment
+ *
+ * @param log The log being cleaned
+ * @param segments The group of segments being cleaned
+ * @param map The offset map to use for cleaning segments
+ * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
+ */
+ private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) {
+ // create a new segment with the suffix .cleaned appended to both the log and index name
+ val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
+ logFile.delete()
+ val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
+ indexFile.delete()
+ val messages = new FileMessageSet(logFile)
+ val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
+ val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
+
+ // clean segments into the new destination segment
+ for (old <- segments)
+ cleanInto(old, cleaned, map)
+
+ // trim excess index
+ index.trimToValidSize()
+
+ // flush new segment to disk before swap
+ cleaned.flush()
+
+ // swap in new segment
+ info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+ try {
+ log.replaceSegments(cleaned, segments, expectedTruncateCount)
+ } catch {
+ case e: OptimisticLockFailureException =>
+ cleaned.delete()
+ throw e
+ }
+ }
+
+ /**
+ * Clean the given source log segment into the destination segment using the key=>offset mapping
+ * provided
+ *
+ * @param source The dirty log segment
+ * @param dest The cleaned log segment
+ * @param map The key=>offset mapping
+ *
+ * TODO: Implement proper compression support
+ */
+ private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) {
+ var position = 0
+ while (position < source.log.sizeInBytes) {
+ checkDone()
+ // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
+ readBuffer.clear()
+ writeBuffer.clear()
+ val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
+ throttler.maybeThrottle(messages.sizeInBytes)
+ // check each message to see if it is to be retained
+ var messagesRead = 0
+ for (entry <- messages) {
+ messagesRead += 1
+ val size = MessageSet.entrySize(entry.message)
+ position += size
+ stats.readMessage(size)
+ val key = entry.message.key
+ require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
+ val lastOffset = map.get(key)
+ /* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */
+ val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
+ if (retainRecord) {
+ ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
+ stats.recopyMessage(size)
+ }
+ }
+ // if any messages are to be retained, write them out
+ if(writeBuffer.position > 0) {
+ writeBuffer.flip()
+ val retained = new ByteBufferMessageSet(writeBuffer)
+ dest.append(retained.head.offset, retained)
+ throttler.maybeThrottle(writeBuffer.limit)
+ }
+
+ // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
+ if(readBuffer.limit > 0 && messagesRead == 0)
+ growBuffers()
+ }
+ restoreBuffers()
+ }
+
+ /**
+ * Double the I/O buffer capacity
+ */
+ def growBuffers() {
+ if(readBuffer.capacity >= maxIoBufferSize || writeBuffer.capacity >= maxIoBufferSize)
+ throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxIoBufferSize))
+ val newSize = math.min(this.readBuffer.capacity * 2, maxIoBufferSize)
+ info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.")
+ this.readBuffer = ByteBuffer.allocate(newSize)
+ this.writeBuffer = ByteBuffer.allocate(newSize)
+ }
+
+ /**
+ * Restore the I/O buffer capacity to its original size
+ */
+ def restoreBuffers() {
+ if(this.readBuffer.capacity > this.ioBufferSize)
+ this.readBuffer = ByteBuffer.allocate(this.ioBufferSize)
+ if(this.writeBuffer.capacity > this.ioBufferSize)
+ this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize)
+ }
+
+ /**
+ * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data.
+ * We collect a group of such segments together into a single
+ * destination segment. This prevents segment sizes from shrinking too much.
+ *
+ * @param segments The log segments to group
+ * @param maxSize the maximum size in bytes for the total of all log data in a group
+ * @param maxIndexSize the maximum size in bytes for the total of all index data in a group
+ *
+ * @return A list of grouped segments
+ */
+ private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
+ var grouped = List[List[LogSegment]]()
+ var segs = segments.toList
+ while(!segs.isEmpty) {
+ var group = List(segs.head)
+ var logSize = segs.head.size
+ var indexSize = segs.head.index.sizeInBytes
+ segs = segs.tail
+ while(!segs.isEmpty &&
+ logSize + segs.head.size < maxSize &&
+ indexSize + segs.head.index.sizeInBytes < maxIndexSize) {
+ group = segs.head :: group
+ logSize += segs.head.size
+ indexSize += segs.head.index.sizeInBytes
+ segs = segs.tail
+ }
+ grouped ::= group.reverse
+ }
+ grouped.reverse
+ }
+
+ /**
+ * Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning.
+ * @param log The log to use
+ * @param start The offset at which dirty messages begin
+ * @param end The ending offset for the map that is being built
+ * @param map The map in which to store the mappings
+ *
+ * @return The final offset the map covers
+ */
+ private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
+ map.clear()
+ val segments = log.logSegments(start, end)
+ info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end))
+ var offset = segments.head.baseOffset
+ require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
+ for (segment <- segments) {
+ checkDone()
+ offset = buildOffsetMap(segment, map)
+ }
+ info("Offset map for log %s complete.".format(log.name))
+ offset
+ }
+
+ /**
+ * Add the messages in the given segment to the offset map
+ *
+ * @param segment The segment to index
+ * @param map The map in which to store the key=>offset mapping
+ *
+ * @return The final offset covered by the map
+ */
+ private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = {
+ var position = 0
+ var offset = segment.baseOffset
+ while (position < segment.log.sizeInBytes) {
+ checkDone()
+ readBuffer.clear()
+ val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
+ throttler.maybeThrottle(messages.sizeInBytes)
+ val startPosition = position
+ for (entry <- messages) {
+ val message = entry.message
+ require(message.hasKey)
+ val size = MessageSet.entrySize(message)
+ position += size
+ map.put(message.key, entry.offset)
+ offset = entry.offset
+ stats.indexMessage(size)
+ }
+ // if we didn't read even one complete message, our read buffer may be too small
+ if(position == startPosition)
+ growBuffers()
+ }
+ restoreBuffers()
+ offset
+ }
+
+ /**
+ * If we aren't running any more throw an AllDoneException
+ */
+ private def checkDone() {
+ if (Thread.currentThread.isInterrupted)
+ throw new InterruptedException
+ }
+}
+
+/**
+ * A simple struct for collecting stats about log cleaning
+ */
+private case class CleanerStats(time: Time = SystemTime) {
+ var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L
+ clear()
+
+ def readMessage(size: Int) {
+ messagesRead += 1
+ bytesRead += size
+ }
+
+ def recopyMessage(size: Int) {
+ messagesWritten += 1
+ bytesWritten += size
+ }
+
+ def indexMessage(size: Int) {
+ mapMessagesRead += 1
+ mapBytesRead += size
+ }
+
+ def indexDone() {
+ mapCompleteTime = time.milliseconds
+ }
+
+ def allDone() {
+ endTime = time.milliseconds
+ }
+
+ def elapsedSecs = (endTime - startTime)/1000.0
+
+ def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
+
+ def clear() {
+ startTime = time.milliseconds
+ mapCompleteTime = -1L
+ endTime = -1L
+ bytesRead = 0L
+ bytesWritten = 0L
+ mapBytesRead = 0L
+ mapMessagesRead = 0L
+ messagesRead = 0L
+ messagesWritten = 0L
+ }
+}
+
+/**
+ * Helper class for a log, its topic/partition, and the last clean position
+ */
+private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
+ val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum
+ val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
+ val cleanableRatio = dirtyBytes / totalBytes.toDouble
+ def totalBytes = cleanBytes + dirtyBytes
+ override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
new file mode 100644
index 0000000..5a10bef
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import scala.collection._
+import kafka.common._
+
+/**
+ * Configuration settings for a log
+ * @param segmentSize The soft maximum for the size of a segment file in the log
+ * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled
+ * @param flushInterval The number of messages that can be written to the log before a flush is forced
+ * @param flushMs The amount of time the log can have dirty data before a flush is forced
+ * @param retentionSize The approximate total number of bytes this log can use
+ * @param retentionMs The age approximate maximum age of the last segment that is retained
+ * @param maxIndexSize The maximum size of an index file
+ * @param indexInterval The approximate number of bytes between index entries
+ * @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
+ * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
+ * @param dedupe Should old segments in this log be deleted or deduplicated?
+ */
+case class LogConfig(val segmentSize: Int = 1024*1024,
+ val segmentMs: Long = Long.MaxValue,
+ val flushInterval: Long = Long.MaxValue,
+ val flushMs: Long = Long.MaxValue,
+ val retentionSize: Long = Long.MaxValue,
+ val retentionMs: Long = Long.MaxValue,
+ val maxMessageSize: Int = Int.MaxValue,
+ val maxIndexSize: Int = 1024*1024,
+ val indexInterval: Int = 4096,
+ val fileDeleteDelayMs: Long = 60*1000,
+ val minCleanableRatio: Double = 0.5,
+ val dedupe: Boolean = false)
+
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c5ab8a2..438d802 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -36,32 +36,31 @@ import kafka.server.KafkaConfig
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
-class LogManager(val config: KafkaConfig,
+class LogManager(val logDirs: Array[File],
+ val topicConfigs: Map[String, LogConfig],
+ val defaultConfig: LogConfig,
+ val cleanerConfig: CleanerConfig,
+ val flushCheckMs: Long,
+ val retentionCheckMs: Long,
scheduler: Scheduler,
private val time: Time) extends Logging {
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
- val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
- private val logFileSizeMap = config.logSegmentBytesPerTopicMap
- private val logFlushInterval = config.logFlushIntervalMessages
- private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
private val logCreationLock = new Object
- private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
- private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
- private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
- private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
- private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
- private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
-
- this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
private val logs = new Pool[TopicAndPartition, Log]()
createAndValidateLogDirs(logDirs)
private var dirLocks = lockLogDirs(logDirs)
loadLogs(logDirs)
+ private val cleaner: LogCleaner =
+ if(cleanerConfig.enableCleaner)
+ new LogCleaner(cleanerConfig, logDirs, logs, time = time)
+ else
+ null
+
/**
* Create and check validity of the given directories, specifically:
* <ol>
@@ -114,18 +113,11 @@ class LogManager(val config: KafkaConfig,
if(dir.isDirectory){
info("Loading log '" + dir.getName + "'")
val topicPartition = parseTopicPartitionName(dir.getName)
- val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
- val log = new Log(dir,
+ val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+ val log = new Log(dir,
+ config,
+ needsRecovery,
scheduler,
- maxLogFileSize,
- config.messageMaxBytes,
- logFlushInterval,
- rollIntervalMs,
- needsRecovery,
- config.logIndexSizeMaxBytes,
- config.logIndexIntervalBytes,
- config.logDeleteDelayMs,
time)
val previous = this.logs.put(topicPartition, log)
if(previous != null)
@@ -142,20 +134,41 @@ class LogManager(val config: KafkaConfig,
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
- info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
- scheduler.schedule("kafka-log-cleaner",
+ info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
+ scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
- period = logCleanupIntervalMs,
+ period = retentionCheckMs,
TimeUnit.MILLISECONDS)
- info("Starting log flusher with a default period of %d ms with the following overrides: %s."
- .format(config.logFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
+ info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
- period = config.logFlushSchedulerIntervalMs,
+ period = flushCheckMs,
TimeUnit.MILLISECONDS)
}
+ if(cleanerConfig.enableCleaner)
+ cleaner.startup()
+ }
+
+ /**
+ * Close all the logs
+ */
+ def shutdown() {
+ debug("Shutting down.")
+ try {
+ // stop the cleaner first
+ if(cleaner != null)
+ Utils.swallow(cleaner.shutdown())
+ // close the logs
+ allLogs.foreach(_.close())
+ // mark that the shutdown was clean by creating the clean shutdown marker file
+ logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
+ } finally {
+ // regardless of whether the close succeeded, we need to unlock the data directories
+ dirLocks.foreach(_.destroy())
+ }
+ debug("Shutdown complete.")
}
/**
@@ -197,18 +210,10 @@ class LogManager(val config: KafkaConfig,
val dataDir = nextLogDir()
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
- val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
log = new Log(dir,
+ defaultConfig,
+ needsRecovery = false,
scheduler,
- maxLogFileSize,
- config.messageMaxBytes,
- logFlushInterval,
- rollIntervalMs,
- needsRecovery = false,
- config.logIndexSizeMaxBytes,
- config.logIndexIntervalBytes,
- config.logDeleteDelayMs,
time)
info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
@@ -242,8 +247,7 @@ class LogManager(val config: KafkaConfig,
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
val topic = parseTopicPartitionName(log.name).topic
- val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
- log.deleteOldSegments(startMs - _.lastModified > logCleanupThresholdMs)
+ log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
/**
@@ -252,10 +256,9 @@ class LogManager(val config: KafkaConfig,
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = parseTopicPartitionName(log.dir.getName).topic
- val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
- if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
+ if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
- var diff = log.size - maxLogRetentionSize
+ var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
@@ -274,7 +277,7 @@ class LogManager(val config: KafkaConfig,
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
- for(log <- allLogs) {
+ for(log <- allLogs; if !log.config.dedupe) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
@@ -283,23 +286,6 @@ class LogManager(val config: KafkaConfig,
}
/**
- * Close all the logs
- */
- def shutdown() {
- debug("Shutting down.")
- try {
- // close the logs
- allLogs.foreach(_.close())
- // mark that the shutdown was clean by creating the clean shutdown marker file
- logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
- } finally {
- // regardless of whether the close succeeded, we need to unlock the data directories
- dirLocks.foreach(_.destroy())
- }
- debug("Shutdown complete.")
- }
-
- /**
* Get all the partition logs
*/
def allLogs(): Iterable[Log] = logs.values
@@ -312,13 +298,9 @@ class LogManager(val config: KafkaConfig,
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
-
- var logFlushInterval = config.logFlushIntervalMs
- if(logFlushIntervals.contains(topicAndPartition.topic))
- logFlushInterval = logFlushIntervals(topicAndPartition.topic)
- debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + logFlushInterval +
+ debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
- if(timeSinceLastFlush >= logFlushInterval)
+ if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 39dd9c2..120ebeb 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -79,7 +79,7 @@ class LogSegment(val log: FileMessageSet,
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
- private def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
+ private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
@@ -168,18 +168,20 @@ class LogSegment(val log: FileMessageSet,
* Truncate off all index and log entries with offsets >= the given offset.
* If the given offset is larger than the largest message in this segment, do nothing.
* @param offset The offset to truncate to
+ * @return The number of log bytes truncated
*/
@nonthreadsafe
- def truncateTo(offset: Long) {
+ def truncateTo(offset: Long): Int = {
val mapping = translateOffset(offset)
if(mapping == null)
- return
+ return 0
index.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
- log.truncateTo(mapping.position)
- if (log.sizeInBytes == 0)
+ val bytesTruncated = log.truncateTo(mapping.position)
+ if(log.sizeInBytes == 0)
created = time.milliseconds
+ bytesTruncated
}
/**
@@ -211,6 +213,18 @@ class LogSegment(val log: FileMessageSet,
}
/**
+ * Change the suffix for the index and log file for this log segment
+ */
+ def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
+ val logRenamed = log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
+ if(!logRenamed)
+ throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
+ val indexRenamed = index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
+ if(!indexRenamed)
+ throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
+ }
+
+ /**
* Close this log segment
*/
def close() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 1662f10..23e659f 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -49,7 +49,7 @@ import kafka.utils._
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
-class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
/* initialize the memory mapping for this index */
private var mmap: MappedByteBuffer =
@@ -83,20 +83,15 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
Utils.swallow(raf.close())
}
}
-
- info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
- .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
-
- /**
- * The maximum number of eight-byte entries this index can hold
- */
- def maxEntries = mmap.limit / 8
/* the number of eight-byte entries currently in the index */
private var size = new AtomicInteger(mmap.position / 8)
/* the last offset in the index */
var lastOffset = readLastOffset()
+
+ info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
/**
* The last offset written to the index
@@ -109,6 +104,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
}
baseOffset + offset
}
+
+ /**
+ * The maximum number of eight-byte entries this index can hold
+ */
+ def maxEntries = mmap.limit / 8
/**
* Find the largest offset less than or equal to the given targetOffset
@@ -284,12 +284,27 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
/** The number of entries in this index */
def entries() = size.get
+ /**
+ * The number of bytes actually used by this index
+ */
+ def sizeInBytes() = 8 * entries
+
/** Close the index */
def close() {
trimToValidSize()
}
/**
+ * Rename the file that backs this offset index
+ * @return true iff the rename was successful
+ */
+ def renameTo(f: File): Boolean = {
+ val success = this.file.renameTo(f)
+ this.file = f
+ success
+ }
+
+ /**
* Round a number to the greatest exact multiple of the given factor less than the given number.
* E.g. roundToExactMultiple(67, 8) == 64
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
new file mode 100644
index 0000000..6236813
--- /dev/null
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.util.Arrays
+import java.security.MessageDigest
+import java.nio.ByteBuffer
+import kafka.utils._
+
+trait OffsetMap {
+ def capacity: Int
+ def put(key: ByteBuffer, offset: Long)
+ def get(key: ByteBuffer): Long
+ def clear()
+ def size: Int
+ def utilization: Double = size.toDouble / capacity
+}
+
+/**
+ * An approximate map used for deduplicating the log.
+ * @param memory The amount of memory this map can use
+ * @param maxLoadFactor The maximum percent full this offset map can be
+ * @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512
+ */
+@nonthreadsafe
+class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap {
+ private val bytes = ByteBuffer.allocate(memory)
+
+ /* the hash algorithm instance to use, defualt is MD5 */
+ private val digest = MessageDigest.getInstance(hashAlgorithm)
+
+ /* the number of bytes for this hash algorithm */
+ private val hashSize = digest.getDigestLength
+
+ /* create some hash buffers to avoid reallocating each time */
+ private val hash1 = new Array[Byte](hashSize)
+ private val hash2 = new Array[Byte](hashSize)
+
+ /* number of entries put into the map */
+ private var entries = 0
+
+ /* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */
+ private var salt: Byte = 0
+
+ /**
+ * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset)
+ */
+ val bytesPerEntry = hashSize + 8
+
+ /**
+ * The maximum number of entries this map can contain before it exceeds the max load factor
+ */
+ override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt
+
+ /**
+ * Associate a offset with a key.
+ * @param key The key
+ * @param offset The offset
+ */
+ override def put(key: ByteBuffer, offset: Long) {
+ if(size + 1 > capacity)
+ throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity))
+ hash(key, hash1)
+ bytes.position(offsetFor(hash1))
+ bytes.put(hash1)
+ bytes.putLong(offset)
+ entries += 1
+ }
+
+ /**
+ * Get the offset associated with this key. This method is approximate,
+ * it may not find an offset previously stored, but cannot give a wrong offset.
+ * @param key The key
+ * @return The offset associated with this key or -1 if the key is not found
+ */
+ override def get(key: ByteBuffer): Long = {
+ hash(key, hash1)
+ bytes.position(offsetFor(hash1))
+ bytes.get(hash2)
+ // if the computed hash equals the stored hash return the stored offset
+ if(Arrays.equals(hash1, hash2))
+ bytes.getLong()
+ else
+ -1L
+ }
+
+ /**
+ * Change the salt used for key hashing making all existing keys unfindable.
+ * Doesn't actually zero out the array.
+ */
+ override def clear() {
+ this.entries = 0
+ this.salt = (this.salt + 1).toByte
+ Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
+ }
+
+ /**
+ * The number of entries put into the map (note that not all may remain)
+ */
+ override def size: Int = entries
+
+ /**
+ * Choose a slot in the array for this hash
+ */
+ private def offsetFor(hash: Array[Byte]): Int =
+ bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity)
+
+ /**
+ * The offset at which we have stored the given key
+ * @param key The key to hash
+ * @param buffer The buffer to store the hash into
+ */
+ private def hash(key: ByteBuffer, buffer: Array[Byte]) {
+ key.mark()
+ digest.update(salt)
+ digest.update(key)
+ key.reset()
+ digest.digest(buffer, 0, hashSize)
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 873699f..1b9c8f8 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -73,7 +73,7 @@ object ByteBufferMessageSet {
new ByteBufferMessageSet(outputBuffer)
}
- private def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
+ private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) {
buffer.putLong(offset)
buffer.putInt(message.size)
buffer.put(message.buffer)
@@ -150,7 +150,7 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
return allDone()
val offset = topIter.getLong()
val size = topIter.getInt()
- if(size < 0)
+ if(size < 0 || size < Message.MinHeaderSize)
throw new InvalidMessageException("Message found with corrupt size (" + size + ")")
// we have an incomplete message
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
deleted file mode 100644
index 5aa0141..0000000
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import kafka.utils.Logging
-import kafka.common._
-import java.util.concurrent.locks.ReentrantLock
-import java.io._
-
-/**
- * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
- * all topics and partitions that this broker hosts. The format of this file is as follows -
- * version
- * number of entries
- * topic partition highwatermark
- */
-
-object HighwaterMarkCheckpoint {
- val highWatermarkFileName = "replication-offset-checkpoint"
- val currentHighwaterMarkFileVersion = 0
-}
-
-class HighwaterMarkCheckpoint(val path: String) extends Logging {
- /* create the highwatermark file handle for all partitions */
- val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
- private val hwFile = new File(name)
- private val hwFileLock = new ReentrantLock()
- // recover from previous tmp file, if required
-
- def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) {
- hwFileLock.lock()
- try {
- // write to temp file and then swap with the highwatermark file
- val tempHwFile = new File(hwFile + ".tmp")
-
- val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
- // checkpoint highwatermark for all partitions
- // write the current version
- hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString)
- hwFileWriter.newLine()
- // write the number of entries in the highwatermark file
- hwFileWriter.write(highwaterMarksPerPartition.size.toString)
- hwFileWriter.newLine()
-
- highwaterMarksPerPartition.foreach { partitionAndHw =>
- hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2))
- hwFileWriter.newLine()
- }
- hwFileWriter.flush()
- hwFileWriter.close()
- // swap new high watermark file with previous one
- if(!tempHwFile.renameTo(hwFile)) {
- fatal("Attempt to swap the new high watermark file with the old one failed")
- System.exit(1)
- }
- }finally {
- hwFileLock.unlock()
- }
- }
-
- def read(topic: String, partition: Int): Long = {
- hwFileLock.lock()
- try {
- hwFile.length() match {
- case 0 =>
- warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d."
- .format(topic, partition))
- 0L
- case _ =>
- val hwFileReader = new BufferedReader(new FileReader(hwFile))
- val version = hwFileReader.readLine().toShort
- version match {
- case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
- val numberOfHighWatermarks = hwFileReader.readLine().toInt
- val partitionHighWatermarks =
- for(i <- 0 until numberOfHighWatermarks) yield {
- val nextHwEntry = hwFileReader.readLine()
- val partitionHwInfo = nextHwEntry.split(" ")
- val topic = partitionHwInfo(0)
- val partitionId = partitionHwInfo(1).toInt
- val highWatermark = partitionHwInfo(2).toLong
- (TopicAndPartition(topic, partitionId) -> highWatermark)
- }
- hwFileReader.close()
- val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
- hwOpt match {
- case Some(hw) =>
- debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition))
- hw
- case None =>
- warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
- "partition %d. Returning 0 as the highwatermark".format(partition))
- 0L
- }
- case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
- System.exit(1)
- -1L
- }
- }
- }finally {
- hwFileLock.unlock()
- }
- }
-}
\ No newline at end of file