You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/03/12 19:18:35 UTC
git commit: KAFKA-739 Handle null message payloads in messages and in
the log cleaner. Reviewed by Jun and Neha.
Updated Branches:
refs/heads/trunk c1ed12e44 -> 9ff4e8eb1
KAFKA-739 Handle null message payloads in messages and in the log cleaner. Reviewed by Jun and Neha.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9ff4e8eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9ff4e8eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9ff4e8eb
Branch: refs/heads/trunk
Commit: 9ff4e8eb10e0ddd86f257e99d55971a132426605
Parents: c1ed12e
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Mar 12 11:17:12 2013 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Mar 12 11:17:12 2013 -0700
----------------------------------------------------------------------
.../java/kafka/etl/impl/SimpleKafkaETLMapper.java | 2 +
.../scala/kafka/consumer/ConsumerIterator.scala | 2 +-
core/src/main/scala/kafka/log/CleanerConfig.scala | 4 +-
core/src/main/scala/kafka/log/FileMessageSet.scala | 3 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 104 ++++++---
core/src/main/scala/kafka/log/LogConfig.scala | 6 +
core/src/main/scala/kafka/log/LogSegment.scala | 9 +-
core/src/main/scala/kafka/log/OffsetMap.scala | 103 ++++++---
core/src/main/scala/kafka/message/Message.scala | 16 +-
.../kafka/producer/async/DefaultEventHandler.scala | 5 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 14 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +
.../main/scala/kafka/tools/DumpLogSegments.scala | 5 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
.../main/scala/kafka/utils/IteratorTemplate.scala | 17 +-
.../scala/kafka/utils/VerifiableProperties.scala | 6 +-
.../test/scala/other/kafka/TestLogCleaning.scala | 198 +++++++++++----
.../test/scala/unit/kafka/log/CleanerTest.scala | 66 +++--
core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +
.../test/scala/unit/kafka/log/OffsetMapTest.scala | 43 ++--
.../scala/unit/kafka/message/MessageTest.scala | 9 +-
.../scala/unit/kafka/producer/ProducerTest.scala | 25 ++-
.../unit/kafka/utils/IteratorTemplateTest.scala | 41 +++
23 files changed, 504 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
index b0aadff..51b6adf 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/SimpleKafkaETLMapper.java
@@ -41,6 +41,8 @@ Mapper<KafkaETLKey, BytesWritable, LongWritable, Text> {
protected Text getData(Message message) throws IOException {
ByteBuffer buf = message.payload();
+ if(buf == null)
+ return new Text();
byte[] array = new byte[buf.limit()];
buf.get(array);
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 746a4bd..963a3a9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -108,7 +108,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
val keyBuffer = item.message.key
val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer))
- val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
+ val value = if(item.message.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(item.message.payload))
new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId, item.offset)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
index 999fee8..fa946ad 100644
--- a/core/src/main/scala/kafka/log/CleanerConfig.scala
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -30,8 +30,8 @@ package kafka.log
* @param hashAlgorithm The hash algorithm to use in key comparison.
*/
case class CleanerConfig(val numThreads: Int = 1,
- val dedupeBufferSize: Int = 4*1024*1024,
- val dedupeBufferLoadFactor: Double = 0.75,
+ val dedupeBufferSize: Long = 4*1024*1024L,
+ val dedupeBufferLoadFactor: Double = 0.9d,
val ioBufferSize: Int = 1024*1024,
val maxMessageSize: Int = 32*1024*1024,
val maxIoBytesPerSecond: Double = Double.MaxValue,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 0eef33e..abb160c 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -159,13 +159,14 @@ class FileMessageSet private[kafka](@volatile var file: File,
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
+ val sizeOffsetBuffer = ByteBuffer.allocate(12)
override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()
// read the size of the item
- val sizeOffsetBuffer = ByteBuffer.allocate(12)
+ sizeOffsetBuffer.rewind()
channel.read(sizeOffsetBuffer, location)
if(sizeOffsetBuffer.hasRemaining)
return allDone()
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 368a12b..ccde2ab 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -23,6 +23,7 @@ import java.nio._
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
+import java.util.Date
import java.io.File
import kafka.common._
import kafka.message._
@@ -39,8 +40,7 @@ import kafka.utils._
* The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy
* and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
*
- * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. For memory efficiency this mapping
- * is approximate. That is allowed to lose some key=>offset pairs, but never to return a wrong answer. See kafka.log.OffsetMap for details of
+ * To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of
* the implementation of the mapping.
*
* Once the key=>offset map is built, the log is cleaned by recopying each log segment but omitting any key that appears in the offset map with a
@@ -53,6 +53,11 @@ import kafka.utils._
*
* One nuance that the cleaner must handle is log truncation. If a log is truncated while it is being cleaned the cleaning of that log is aborted.
*
+ * Messages with null payload are treated as deletes for the purpose of log compaction. This means that they receive special treatment by the cleaner.
+ * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic
+ * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed).
+ * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning.
+ *
* @param config Configuration parameters for the cleaner
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
@@ -62,7 +67,7 @@ class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging {
-
+
/* the offset checkpoints holding the last cleaned point for each log */
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap
@@ -160,12 +165,14 @@ class LogCleaner(val config: CleanerConfig,
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
*/
private class CleanerThread extends Thread {
+ if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
+ warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
val cleaner = new Cleaner(id = threadId.getAndIncrement(),
- offsetMap = new SkimpyOffsetMap(memory = config.dedupeBufferSize / config.numThreads,
- maxLoadFactor = config.dedupeBufferLoadFactor,
+ offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
hashAlgorithm = config.hashAlgorithm),
ioBufferSize = config.ioBufferSize / config.numThreads / 2,
maxIoBufferSize = config.maxMessageSize,
+ dupBufferLoadFactor = config.dedupeBufferLoadFactor,
throttler = throttler,
time = time)
@@ -251,13 +258,20 @@ private[log] class Cleaner(val id: Int,
offsetMap: OffsetMap,
ioBufferSize: Int,
maxIoBufferSize: Int,
+ dupBufferLoadFactor: Double,
throttler: Throttler,
time: Time) extends Logging {
- this.logIdent = "Cleaner " + id + ":"
+ this.logIdent = "Cleaner " + id + ": "
+
+ /* stats on this cleaning */
val stats = new CleanerStats(time)
- private var readBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk read I/O
- private var writeBuffer = ByteBuffer.allocate(ioBufferSize) // buffer for disk write I/O
+
+ /* buffer used for read i/o */
+ private var readBuffer = ByteBuffer.allocate(ioBufferSize)
+
+ /* buffer used for write i/o */
+ private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
/**
* Clean the given log
@@ -268,22 +282,29 @@ private[log] class Cleaner(val id: Int,
*/
private[log] def clean(cleanable: LogToClean): Long = {
stats.clear()
- val topic = cleanable.topicPartition.topic
- val part = cleanable.topicPartition.partition
- info("Beginning cleaning of %s-%d.".format(topic, part))
+ info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log
val truncateCount = log.numberOfTruncates
// build the offset map
- val upperBoundOffset = math.min(log.activeSegment.baseOffset, cleanable.firstDirtyOffset + offsetMap.capacity)
+ info("Building offset map for %s...".format(cleanable.log.name))
+ val upperBoundOffset = log.activeSegment.baseOffset
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
stats.indexDone()
- // group the segments and clean the groups
- for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
- info("Cleaning segments %s for log %s...".format(group.map(_.baseOffset).mkString(","), log.name))
- cleanSegments(log, group, offsetMap, truncateCount)
+ // figure out the timestamp below which it is safe to remove delete tombstones
+ // this position is defined to be a configurable time beneath the last modified time of the last clean segment
+ val deleteHorizonMs =
+ log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
+ case None => 0L
+ case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
}
+
+ // group the segments and clean the groups
+ info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
+ for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
+ cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs)
+
stats.allDone()
endOffset
}
@@ -295,8 +316,13 @@ private[log] class Cleaner(val id: Int,
* @param segments The group of segments being cleaned
* @param map The offset map to use for cleaning segments
* @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet
+ * @param deleteHorizonMs The time to retain delete tombstones
*/
- private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int) {
+ private[log] def cleanSegments(log: Log,
+ segments: Seq[LogSegment],
+ map: OffsetMap,
+ expectedTruncateCount: Int,
+ deleteHorizonMs: Long) {
// create a new segment with the suffix .cleaned appended to both the log and index name
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
logFile.delete()
@@ -307,17 +333,25 @@ private[log] class Cleaner(val id: Int,
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time)
// clean segments into the new destination segment
- for (old <- segments)
- cleanInto(old, cleaned, map)
+ for (old <- segments) {
+ val retainDeletes = old.lastModified > deleteHorizonMs
+ info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
+ .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+ cleanInto(old, cleaned, map, retainDeletes)
+ }
// trim excess index
index.trimToValidSize()
// flush new segment to disk before swap
cleaned.flush()
+
+ // update the modification date to retain the last modified date of the original files
+ val modified = segments.last.lastModified
+ cleaned.lastModified = modified
// swap in new segment
- info("Swapping in cleaned segment %d for %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
+ info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
try {
log.replaceSegments(cleaned, segments, expectedTruncateCount)
} catch {
@@ -334,10 +368,11 @@ private[log] class Cleaner(val id: Int,
* @param source The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
+ * @param retainDeletes Should delete tombstones be retained while cleaning this segment
*
* TODO: Implement proper compression support
*/
- private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap) {
+ private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
var position = 0
while (position < source.log.sizeInBytes) {
checkDone()
@@ -355,10 +390,14 @@ private[log] class Cleaner(val id: Int,
stats.readMessage(size)
val key = entry.message.key
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
- val lastOffset = map.get(key)
- /* retain the record if it isn't present in the map OR it is present but this offset is the highest (and it's not a delete) */
- val retainRecord = lastOffset < 0 || (entry.offset >= lastOffset && entry.message.payload != null)
- if (retainRecord) {
+ val foundOffset = map.get(key)
+ /* two cases in which we can get rid of a message:
+ * 1) if there exists a message with the same key but higher offset
+ * 2) if the message is a delete "tombstone" marker and enough time has passed
+ */
+ val redundant = foundOffset >= 0 && entry.offset < foundOffset
+ val obsoleteDelete = !retainDeletes && entry.message.isNull
+ if (!redundant && !obsoleteDelete) {
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
stats.recopyMessage(size)
}
@@ -443,13 +482,18 @@ private[log] class Cleaner(val id: Int,
*/
private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
map.clear()
- val segments = log.logSegments(start, end)
- info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, segments.size, start, end))
- var offset = segments.head.baseOffset
+ val dirty = log.logSegments(start, end).toSeq
+ info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
+
+ // Add all the dirty segments. We must take at least map.slots * load_factor,
+ // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
+ var offset = dirty.head.baseOffset
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
- for (segment <- segments) {
+ val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
+ for (segment <- dirty) {
checkDone()
- offset = buildOffsetMap(segment, map)
+ if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
+ offset = buildOffsetMap(segment, map)
}
info("Offset map for log %s complete.".format(log.name))
offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index dc42a74..48660bc 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -32,6 +32,7 @@ import kafka.common._
* @param maxIndexSize The maximum size of an index file
* @param indexInterval The approximate number of bytes between index entries
* @param fileDeleteDelayMs The time to wait before deleting a file from the filesystem
+ * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable for logs that are being compacted.
* @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes already cleaned
* @param dedupe Should old segments in this log be deleted or deduplicated?
*/
@@ -45,6 +46,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
val maxIndexSize: Int = 1024*1024,
val indexInterval: Int = 4096,
val fileDeleteDelayMs: Long = 60*1000,
+ val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
val minCleanableRatio: Double = 0.5,
val dedupe: Boolean = false) {
@@ -60,6 +62,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
props.put(RententionMsProp, retentionMs.toString)
props.put(MaxMessageBytesProp, maxMessageSize.toString)
props.put(IndexIntervalBytesProp, indexInterval.toString)
+ props.put(DeleteRetentionMsProp, deleteRetentionMs.toString)
props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
props.put(CleanupPolicyProp, if(dedupe) "dedupe" else "delete")
@@ -78,6 +81,7 @@ object LogConfig {
val RententionMsProp = "retention.ms"
val MaxMessageBytesProp = "max.message.bytes"
val IndexIntervalBytesProp = "index.interval.bytes"
+ val DeleteRetentionMsProp = "delete.retention.ms"
val FileDeleteDelayMsProp = "file.delete.delay.ms"
val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
val CleanupPolicyProp = "cleanup.policy"
@@ -92,6 +96,7 @@ object LogConfig {
MaxMessageBytesProp,
IndexIntervalBytesProp,
FileDeleteDelayMsProp,
+ DeleteRetentionMsProp,
MinCleanableDirtyRatioProp,
CleanupPolicyProp)
@@ -110,6 +115,7 @@ object LogConfig {
maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
+ deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
dedupe = props.getProperty(CleanupPolicyProp).trim.toLowerCase == "dedupe")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 120ebeb..30d2e91 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -249,5 +249,12 @@ class LogSegment(val log: FileMessageSet,
* The last modified time of this log segment as a unix time stamp
*/
def lastModified = log.file.lastModified
-
+
+ /**
+ * Change the last modified time for this log segment
+ */
+ def lastModified_=(ms: Long) = {
+ log.file.setLastModified(ms)
+ index.file.setLastModified(ms)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 6236813..42cdfbb 100644
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -23,22 +23,22 @@ import java.nio.ByteBuffer
import kafka.utils._
trait OffsetMap {
- def capacity: Int
+ def slots: Int
def put(key: ByteBuffer, offset: Long)
def get(key: ByteBuffer): Long
def clear()
def size: Int
- def utilization: Double = size.toDouble / capacity
+ def utilization: Double = size.toDouble / slots
}
/**
- * An approximate map used for deduplicating the log.
+ * An hash table used for deduplicating the log. This hash table uses a cryptographicly secure hash of the key as a proxy for the key
+ * for comparisons and to save space on object overhead. Collisions are resolved by probing. This hash table does not support deletes.
* @param memory The amount of memory this map can use
- * @param maxLoadFactor The maximum percent full this offset map can be
* @param hashAlgorithm The hash algorithm instance to use: MD2, MD5, SHA-1, SHA-256, SHA-384, SHA-512
*/
@nonthreadsafe
-class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgorithm: String = "MD5") extends OffsetMap {
+class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap {
private val bytes = ByteBuffer.allocate(memory)
/* the hash algorithm instance to use, defualt is MD5 */
@@ -54,8 +54,11 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
/* number of entries put into the map */
private var entries = 0
- /* a byte added as a prefix to all keys to make collisions non-static in repeated uses. Changed in clear(). */
- private var salt: Byte = 0
+ /* number of lookups on the map */
+ private var lookups = 0L
+
+ /* the number of probes for all lookups */
+ private var probes = 0L
/**
* The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset)
@@ -63,40 +66,66 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
val bytesPerEntry = hashSize + 8
/**
- * The maximum number of entries this map can contain before it exceeds the max load factor
+ * The maximum number of entries this map can contain
*/
- override val capacity: Int = (maxLoadFactor * memory / bytesPerEntry).toInt
+ val slots: Int = (memory / bytesPerEntry).toInt
/**
- * Associate a offset with a key.
+ * Associate this offset to the given key.
* @param key The key
* @param offset The offset
*/
override def put(key: ByteBuffer, offset: Long) {
- if(size + 1 > capacity)
- throw new IllegalStateException("Attempt to add to a full offset map with a maximum capacity of %d.".format(capacity))
- hash(key, hash1)
- bytes.position(offsetFor(hash1))
+ require(entries < slots, "Attempt to add a new entry to a full offset map.")
+ lookups += 1
+ hashInto(key, hash1)
+ // probe until we find the first empty slot
+ var attempt = 0
+ var pos = positionOf(hash1, attempt)
+ while(!isEmpty(pos)) {
+ bytes.position(pos)
+ bytes.get(hash2)
+ if(Arrays.equals(hash1, hash2)) {
+ // we found an existing entry, overwrite it and return (size does not change)
+ bytes.putLong(offset)
+ return
+ }
+ attempt += 1
+ pos = positionOf(hash1, attempt)
+ }
+ // found an empty slot, update it--size grows by 1
+ bytes.position(pos)
bytes.put(hash1)
bytes.putLong(offset)
entries += 1
}
/**
- * Get the offset associated with this key. This method is approximate,
- * it may not find an offset previously stored, but cannot give a wrong offset.
+ * Check that there is no entry at the given position
+ */
+ private def isEmpty(position: Int): Boolean =
+ bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0
+
+ /**
+ * Get the offset associated with this key.
* @param key The key
* @return The offset associated with this key or -1 if the key is not found
*/
override def get(key: ByteBuffer): Long = {
- hash(key, hash1)
- bytes.position(offsetFor(hash1))
- bytes.get(hash2)
- // if the computed hash equals the stored hash return the stored offset
- if(Arrays.equals(hash1, hash2))
- bytes.getLong()
- else
- -1L
+ lookups += 1
+ hashInto(key, hash1)
+ // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot
+ var attempt = 0
+ var pos = 0
+ do {
+ pos = positionOf(hash1, attempt)
+ bytes.position(pos)
+ if(isEmpty(pos))
+ return -1L
+ bytes.get(hash2)
+ attempt += 1
+ } while(!Arrays.equals(hash1, hash2))
+ bytes.getLong()
}
/**
@@ -105,7 +134,8 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
*/
override def clear() {
this.entries = 0
- this.salt = (this.salt + 1).toByte
+ this.lookups = 0L
+ this.probes = 0L
Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte)
}
@@ -115,19 +145,32 @@ class SkimpyOffsetMap(val memory: Int, val maxLoadFactor: Double, val hashAlgori
override def size: Int = entries
/**
- * Choose a slot in the array for this hash
+ * The rate of collisions in the lookups
*/
- private def offsetFor(hash: Array[Byte]): Int =
- bytesPerEntry * (Utils.abs(Utils.readInt(hash, 0)) % capacity)
+ def collisionRate: Double =
+ (this.probes - this.lookups) / this.lookups.toDouble
+
+ /**
+ * Calculate the ith probe position. We first try reading successive integers from the hash itself
+ * then if all of those fail we degrade to linear probing.
+ * @param hash The hash of the key to find the position for
+ * @param attempt The ith probe
+ * @return The byte offset in the buffer at which the ith probing for the given hash would reside
+ */
+ private def positionOf(hash: Array[Byte], attempt: Int): Int = {
+ val probe = Utils.readInt(hash, math.min(attempt, hashSize - 4)) + math.max(0, attempt - hashSize + 4)
+ val slot = Utils.abs(probe) % slots
+ this.probes += 1
+ slot * bytesPerEntry
+ }
/**
* The offset at which we have stored the given key
* @param key The key to hash
* @param buffer The buffer to store the hash into
*/
- private def hash(key: ByteBuffer, buffer: Array[Byte]) {
+ private def hashInto(key: ByteBuffer, buffer: Array[Byte]) {
key.mark()
- digest.update(salt)
digest.update(key)
key.reset()
digest.digest(buffer, 0, hashSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 12a8368..1a43fdf 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -101,7 +101,9 @@ class Message(val buffer: ByteBuffer) {
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
- (if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset)))
+ (if(bytes == null) 0
+ else if(payloadSize >= 0) payloadSize
+ else bytes.length - payloadOffset)))
// skip crc, we will fill that in at the end
buffer.position(MagicOffset)
buffer.put(CurrentMagicValue)
@@ -115,9 +117,12 @@ class Message(val buffer: ByteBuffer) {
buffer.putInt(key.length)
buffer.put(key, 0, key.length)
}
- val size = if(payloadSize >= 0) payloadSize else bytes.length - payloadOffset
+ val size = if(bytes == null) -1
+ else if(payloadSize >= 0) payloadSize
+ else bytes.length - payloadOffset
buffer.putInt(size)
- buffer.put(bytes, payloadOffset, size)
+ if(bytes != null)
+ buffer.put(bytes, payloadOffset, size)
buffer.rewind()
// now compute the checksum and fill it in
@@ -186,6 +191,11 @@ class Message(val buffer: ByteBuffer) {
def payloadSize: Int = buffer.getInt(payloadSizeOffset)
/**
+ * Is the payload of this message null
+ */
+ def isNull(): Boolean = payloadSize < 0
+
+ /**
* The magic version of this message
*/
def magic: Byte = buffer.get(MagicOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 27b16e3..2e3e383 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -249,7 +249,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
if (logger.isTraceEnabled) {
val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
- trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
+ trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
}
failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
.map(partitionStatus => partitionStatus._1)
@@ -257,8 +257,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
.format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
failedTopicPartitions
- } else
+ } else {
Seq.empty[TopicAndPartition]
+ }
} catch {
case t: Throwable =>
warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 006caa7..5e4c9ca 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -109,9 +109,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */
val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
- /* a per-topic override for the cleanup policy for segments beyond the retention window */
- val logCleanupPolicyMap = props.getMap("topic.log.cleanup.policy")
-
/* the number of background threads to use for log cleaning */
val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
@@ -119,11 +116,15 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue)
/* the total memory used for log deduplication across all cleaner threads */
- val logCleanerDedupeBufferSize = props.getIntInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024, (0, Int.MaxValue))
+ val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024L, (0, Long.MaxValue))
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
/* the total memory used for log cleaner I/O buffers across all cleaner threads */
- val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 4*1024*1024, (0, Int.MaxValue))
+ val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024, (0, Int.MaxValue))
+
+ /* log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value
+ * will allow more log to be cleaned at once but will lead to more hash collisions */
+ val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d)
/* the amount of time to sleep when there are no logs to clean */
val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 30*1000, (0L, Long.MaxValue))
@@ -134,6 +135,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* should we enable log cleaning? */
val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
+ /* how long are delete records retained? */
+ val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 * 60 * 60 * 1000L)
+
/* the maximum size in bytes of the offset index */
val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9fa432d..e2f4e91 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -163,6 +163,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
maxMessageSize = config.messageMaxBytes,
maxIndexSize = config.logIndexSizeMaxBytes,
indexInterval = config.logIndexIntervalBytes,
+ deleteRetentionMs = config.logCleanerDeleteRetentionMs,
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
dedupe = config.logCleanupPolicy.trim.toLowerCase == "dedupe")
@@ -171,6 +172,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
+ dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 31333e7..d9546ca 100644
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -140,8 +140,9 @@ object DumpLogSegments {
print(" keysize: " + msg.keySize)
if(printContents) {
if(msg.hasKey)
- print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
- print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8"))
+ print(" key: " + Utils.readString(messageAndOffset.message.key, "UTF-8"))
+ val payload = if(messageAndOffset.message.isNull) null else Utils.readString(messageAndOffset.message.payload, "UTF-8")
+ print(" payload: " + payload)
}
println()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index d8127a8..aa5e661 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -196,7 +196,7 @@ object SimpleConsumerShell extends Logging {
System.out.println("next offset = " + offset)
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
- formatter.writeTo(key, Utils.readBytes(message.payload), System.out)
+ formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
} catch {
case e =>
if (skipMessageOnError)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/utils/IteratorTemplate.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/IteratorTemplate.scala b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
index 301f934..fd952f3 100644
--- a/core/src/main/scala/kafka/utils/IteratorTemplate.scala
+++ b/core/src/main/scala/kafka/utils/IteratorTemplate.scala
@@ -32,16 +32,21 @@ object FAILED extends State
abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
private var state: State = NOT_READY
- private var nextItem: Option[T] = None
+ private var nextItem = null.asInstanceOf[T]
def next(): T = {
if(!hasNext())
throw new NoSuchElementException()
state = NOT_READY
- nextItem match {
- case Some(item) => item
- case None => throw new IllegalStateException("Expected item but none found.")
- }
+ if(nextItem == null)
+ throw new IllegalStateException("Expected item but none found.")
+ nextItem
+ }
+
+ def peek(): T = {
+ if(!hasNext())
+ throw new NoSuchElementException()
+ nextItem
}
def hasNext(): Boolean = {
@@ -58,7 +63,7 @@ abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T
def maybeComputeNext(): Boolean = {
state = FAILED
- nextItem = Some(makeNext())
+ nextItem = makeNext()
if(state == DONE) {
false
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/main/scala/kafka/utils/VerifiableProperties.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
index a2ac55c..9009a9d 100644
--- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala
+++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala
@@ -18,6 +18,7 @@
package kafka.utils
import java.util.Properties
+import java.util.Collections
import scala.collection._
class VerifiableProperties(val props: Properties) extends Logging {
@@ -194,9 +195,8 @@ class VerifiableProperties(val props: Properties) extends Logging {
def verify() {
info("Verifying properties")
- val specifiedProperties = props.propertyNames()
- while (specifiedProperties.hasMoreElements) {
- val key = specifiedProperties.nextElement().asInstanceOf[String]
+ val propNames = JavaConversions.asBuffer(Collections.list(props.propertyNames)).map(_.toString).sorted
+ for(key <- propNames) {
if (!referenceSet.contains(key))
warn("Property %s is not valid".format(key))
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/other/kafka/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLogCleaning.scala b/core/src/test/scala/other/kafka/TestLogCleaning.scala
index d9c721b..0bef218 100644
--- a/core/src/test/scala/other/kafka/TestLogCleaning.scala
+++ b/core/src/test/scala/other/kafka/TestLogCleaning.scala
@@ -27,6 +27,8 @@ import kafka.producer._
import kafka.consumer._
import kafka.serializer._
import kafka.utils._
+import kafka.log.FileMessageSet
+import kafka.log.Log
/**
* This is a torture test that runs against an existing broker. Here is how it works:
@@ -66,6 +68,11 @@ object TestLogCleaning {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
+ val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
+ .withRequiredArg
+ .describedAs("percent")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(0)
val zkConnectOpt = parser.accepts("zk", "Zk url.")
.withRequiredArg
.describedAs("url")
@@ -75,10 +82,18 @@ object TestLogCleaning {
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
- val cleanupOpt = parser.accepts("cleanup", "Delete temp files when done.")
+ val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
+ .withRequiredArg
+ .describedAs("directory")
+ .ofType(classOf[String])
val options = parser.parse(args:_*)
+ if(options.has(dumpOpt)) {
+ dumpLog(new File(options.valueOf(dumpOpt)))
+ System.exit(0)
+ }
+
if(!options.has(brokerOpt) || !options.has(zkConnectOpt) || !options.has(numMessagesOpt)) {
parser.printHelpOn(System.err)
System.exit(1)
@@ -86,83 +101,146 @@ object TestLogCleaning {
// parse options
val messages = options.valueOf(numMessagesOpt).longValue
+ val percentDeletes = options.valueOf(percentDeletesOpt).intValue
val dups = options.valueOf(numDupsOpt).intValue
val brokerUrl = options.valueOf(brokerOpt)
val topicCount = options.valueOf(topicsOpt).intValue
val zkUrl = options.valueOf(zkConnectOpt)
val sleepSecs = options.valueOf(sleepSecsOpt).intValue
- val cleanup = options.has(cleanupOpt)
val testId = new Random().nextInt(Int.MaxValue)
val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
println("Producing %d messages...".format(messages))
- val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, cleanup)
+ val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
println("Sleeping for %d seconds...".format(sleepSecs))
Thread.sleep(sleepSecs * 1000)
println("Consuming messages...")
- val consumedDataFile = consumeMessages(zkUrl, topics, cleanup)
+ val consumedDataFile = consumeMessages(zkUrl, topics)
val producedLines = lineCount(producedDataFile)
val consumedLines = lineCount(consumedDataFile)
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
- println("Validating output files...")
- validateOutput(externalSort(producedDataFile), externalSort(consumedDataFile))
- println("All done.")
+ println("Deduplicating and validating output files...")
+ validateOutput(producedDataFile, consumedDataFile)
+ producedDataFile.delete()
+ consumedDataFile.delete()
+ }
+
+ def dumpLog(dir: File) {
+ require(dir.exists, "Non-existant directory: " + dir.getAbsolutePath)
+ for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
+ val ms = new FileMessageSet(new File(dir, file))
+ for(entry <- ms) {
+ val key = Utils.readString(entry.message.key)
+ val content =
+ if(entry.message.isNull)
+ null
+ else
+ Utils.readString(entry.message.payload)
+ println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
+ }
+ }
}
def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
- def validateOutput(produced: BufferedReader, consumed: BufferedReader) {
- while(true) {
- val prod = readFinalValue(produced)
- val cons = readFinalValue(consumed)
- if(prod == null && cons == null) {
- return
- } else if(prod != cons) {
- System.err.println("Validation failed prod = %s, cons = %s!".format(prod, cons))
- System.exit(1)
- }
+ def validateOutput(producedDataFile: File, consumedDataFile: File) {
+ val producedReader = externalSort(producedDataFile)
+ val consumedReader = externalSort(consumedDataFile)
+ val produced = valuesIterator(producedReader)
+ val consumed = valuesIterator(consumedReader)
+ val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
+ val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
+ val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
+ val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
+ var total = 0
+ var mismatched = 0
+ while(produced.hasNext && consumed.hasNext) {
+ val p = produced.next()
+ producedDeduped.write(p.toString)
+ producedDeduped.newLine()
+ val c = consumed.next()
+ consumedDeduped.write(c.toString)
+ consumedDeduped.newLine()
+ if(p != c)
+ mismatched += 1
+ total += 1
}
+ producedDeduped.close()
+ consumedDeduped.close()
+ require(!produced.hasNext, "Additional values produced not found in consumer log.")
+ require(!consumed.hasNext, "Additional values consumed not found in producer log.")
+ println("Validated " + total + " values, " + mismatched + " mismatches.")
+ require(mismatched == 0, "Non-zero number of row mismatches.")
+ // if all the checks worked out we can delete the deduped files
+ producedDedupedFile.delete()
+ consumedDedupedFile.delete()
}
- def readFinalValue(reader: BufferedReader): (String, Int, Int) = {
- def readTuple() = {
- val line = reader.readLine
- if(line == null)
- null
- else
- line.split("\t")
+ def valuesIterator(reader: BufferedReader) = {
+ new IteratorTemplate[TestRecord] {
+ def makeNext(): TestRecord = {
+ var next = readNext(reader)
+ while(next != null && next.delete)
+ next = readNext(reader)
+ if(next == null)
+ allDone()
+ else
+ next
+ }
}
- var prev = readTuple()
- if(prev == null)
+ }
+
+ def readNext(reader: BufferedReader): TestRecord = {
+ var line = reader.readLine()
+ if(line == null)
return null
+ var curr = new TestRecord(line)
while(true) {
- reader.mark(1024)
- val curr = readTuple()
- if(curr == null || curr(0) != prev(0) || curr(1) != prev(1)) {
- reader.reset()
- return (prev(0), prev(1).toInt, prev(2).toInt)
- } else {
- prev = curr
- }
+ line = peekLine(reader)
+ if(line == null)
+ return curr
+ val next = new TestRecord(line)
+ if(next == null || next.topicAndKey != curr.topicAndKey)
+ return curr
+ curr = next
+ reader.readLine()
}
- return null
+ null
+ }
+
+ def peekLine(reader: BufferedReader) = {
+ reader.mark(4096)
+ val line = reader.readLine
+ reader.reset()
+ line
}
def externalSort(file: File): BufferedReader = {
- val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", file.getAbsolutePath)
+ val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
val process = builder.start()
- new BufferedReader(new InputStreamReader(process.getInputStream()))
+ new Thread() {
+ override def run() {
+ val exitCode = process.waitFor()
+ if(exitCode != 0) {
+ System.err.println("Process exited abnormally.")
+ while(process.getErrorStream.available > 0) {
+ System.err.write(process.getErrorStream().read())
+ }
+ }
+ }
+ }.start()
+ new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
}
def produceMessages(brokerUrl: String,
topics: Array[String],
messages: Long,
- dups: Int,
- cleanup: Boolean): File = {
+ dups: Int,
+ percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty("producer.type", "async")
producerProps.setProperty("broker.list", brokerUrl)
@@ -174,36 +252,49 @@ object TestLogCleaning {
val rand = new Random(1)
val keyCount = (messages / dups).toInt
val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
- if(cleanup)
- producedFile.deleteOnExit()
+ println("Logging produce requests to " + producedFile.getAbsolutePath)
val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
for(i <- 0L until (messages * topics.length)) {
val topic = topics((i % topics.length).toInt)
val key = rand.nextInt(keyCount)
- producer.send(KeyedMessage(topic = topic, key = key.toString, message = i.toString))
- producedWriter.write("%s\t%s\t%s\n".format(topic, key, i))
+ val delete = i % 100 < percentDeletes
+ val msg =
+ if(delete)
+ KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
+ else
+ KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
+ producer.send(msg)
+ producedWriter.write(TestRecord(topic, key, i, delete).toString)
+ producedWriter.newLine()
}
producedWriter.close()
producer.close()
producedFile
}
- def consumeMessages(zkUrl: String, topics: Array[String], cleanup: Boolean): File = {
+ def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
val consumerProps = new Properties
consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
consumerProps.setProperty("zk.connect", zkUrl)
- consumerProps.setProperty("consumer.timeout.ms", (5*1000).toString)
- val connector = new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+ consumerProps.setProperty("consumer.timeout.ms", (10*1000).toString)
+ new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+ }
+
+ def consumeMessages(zkUrl: String, topics: Array[String]): File = {
+ val connector = makeConsumer(zkUrl, topics)
val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
- if(cleanup)
- consumedFile.deleteOnExit()
+ println("Logging consumed messages to " + consumedFile.getAbsolutePath)
val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
for(topic <- topics) {
val stream = streams(topic).head
try {
- for(item <- stream)
- consumedWriter.write("%s\t%s\t%s\n".format(topic, item.key, item.message))
+ for(item <- stream) {
+ val delete = item.message == null
+ val value = if(delete) -1L else item.message.toLong
+ consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
+ consumedWriter.newLine()
+ }
} catch {
case e: ConsumerTimeoutException =>
}
@@ -213,4 +304,11 @@ object TestLogCleaning {
consumedFile
}
+}
+
+case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+ def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
+ def this(line: String) = this(line.split("\t"))
+ override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u")
+ def topicAndKey = topic + key
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index cce2319..4619d86 100644
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -52,7 +52,7 @@ class CleanerTest extends JUnitSuite {
// append messages to the log until we have four segments
while(log.numberOfSegments < 4)
- log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
val keysFound = keysInLog(log)
assertEquals((0L until log.logEndOffset), keysFound)
@@ -62,14 +62,38 @@ class CleanerTest extends JUnitSuite {
keys.foreach(k => map.put(key(k), Long.MaxValue))
// clean the log
- cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
val shouldRemain = keysInLog(log).filter(!keys.contains(_))
assertEquals(shouldRemain, keysInLog(log))
}
+ @Test
+ def testCleaningWithDeletes() {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val log = makeLog(config = logConfig.copy(segmentSize = 1024))
+
+ // append messages with the keys 0 through N
+ while(log.numberOfSegments < 2)
+ log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+ // delete all even keys between 0 and N
+ val leo = log.logEndOffset
+ for(key <- 0 until leo.toInt by 2)
+ log.append(deleteMessage(key))
+
+ // append some new unique keys to pad out to a new active segment
+ while(log.numberOfSegments < 4)
+ log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+ cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+ val keys = keysInLog(log).toSet
+ assertTrue("None of the keys we deleted should still exist.",
+ (0 until leo.toInt by 2).forall(!keys.contains(_)))
+ }
+
/* extract all the keys from a log */
def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.map(m => Utils.readString(m.message.key).toInt))
+ log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt))
/**
@@ -82,14 +106,14 @@ class CleanerTest extends JUnitSuite {
// append messages to the log until we have four segments
while(log.numberOfSegments < 2)
- log.append(messages(log.logEndOffset.toInt, log.logEndOffset.toInt))
+ log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
log.truncateTo(log.logEndOffset-2)
val keys = keysInLog(log)
val map = new FakeOffsetMap(Int.MaxValue)
keys.foreach(k => map.put(key(k), Long.MaxValue))
intercept[OptimisticLockFailureException] {
- cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0)
+ cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L)
}
}
@@ -170,40 +194,34 @@ class CleanerTest extends JUnitSuite {
checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
}
- /**
- * Test that we don't exceed the maximum capacity of the offset map, that is that an offset map
- * with a max size of 1000 will only clean 1000 new entries even if more than that are available.
- */
- @Test
- def testBuildOffsetMapOverCapacity() {
- val map = new FakeOffsetMap(1000)
- val log = makeLog()
- val cleaner = makeCleaner(Int.MaxValue)
- val vals = 0 until 1001
- val offsets = writeToLog(log, vals zip vals)
- val lastOffset = cleaner.buildOffsetMap(log, vals.start, vals.end, map)
- assertEquals("Shouldn't go beyond the capacity of the offset map.", 1000, lastOffset)
- }
-
def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, needsRecovery = false, scheduler = time.scheduler, time = time)
def makeCleaner(capacity: Int) =
- new Cleaner(id = 0, new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, throttler = throttler, time = time)
+ new Cleaner(id = 0,
+ offsetMap = new FakeOffsetMap(capacity),
+ ioBufferSize = 64*1024,
+ maxIoBufferSize = 64*1024,
+ dupBufferLoadFactor = 0.75,
+ throttler = throttler,
+ time = time)
def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for((key, value) <- seq)
- yield log.append(messages(key, value)).firstOffset
+ yield log.append(message(key, value)).firstOffset
}
def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
- def messages(key: Int, value: Int) =
+ def message(key: Int, value: Int) =
new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=value.toString.getBytes))
+ def deleteMessage(key: Int) =
+ new ByteBufferMessageSet(new Message(key=key.toString.getBytes, bytes=null))
+
}
-class FakeOffsetMap(val capacity: Int) extends OffsetMap {
+class FakeOffsetMap(val slots: Int) extends OffsetMap {
val map = new java.util.HashMap[String, Long]()
private def keyFor(key: ByteBuffer) =
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 23e0e65..5658ed4 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -569,4 +569,17 @@ class LogTest extends JUnitSuite {
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
+ @Test
+ def testAppendMessageWithNullPayload() {
+ var log = new Log(logDir,
+ LogConfig(),
+ needsRecovery = false,
+ time.scheduler,
+ time)
+ log.append(new ByteBufferMessageSet(new Message(bytes = null)))
+ val ms = log.read(0, 4096, None)
+ assertEquals(0, ms.head.offset)
+ assertTrue("Message payload should be null.", ms.head.message.isNull)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index 99a0c4b..12ce39e 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -29,11 +29,12 @@ class OffsetMapTest extends JUnitSuite {
validateMap(10)
validateMap(100)
validateMap(1000)
+ validateMap(5000)
}
@Test
def testClear() {
- val map = new SkimpyOffsetMap(4000, 0.75)
+ val map = new SkimpyOffsetMap(4000)
for(i <- 0 until 10)
map.put(key(i), i)
for(i <- 0 until 10)
@@ -43,45 +44,33 @@ class OffsetMapTest extends JUnitSuite {
assertEquals(map.get(key(i)), -1L)
}
- @Test
- def testCapacity() {
- val map = new SkimpyOffsetMap(1024, 0.75)
- var i = 0
- while(map.size < map.capacity) {
- map.put(key(i), i)
- i += 1
- }
- // now the map is full, it should throw an exception
- intercept[IllegalStateException] {
- map.put(key(i), i)
- }
- }
-
def key(key: Int) = ByteBuffer.wrap(key.toString.getBytes)
- def validateMap(items: Int) {
- val map = new SkimpyOffsetMap(items * 2 * 24, 0.75)
+ def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = {
+ val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
for(i <- 0 until items)
map.put(key(i), i)
var misses = 0
- for(i <- 0 until items) {
- map.get(key(i)) match {
- case -1L => misses += 1
- case offset => assertEquals(i.toLong, offset)
- }
- }
- println("Miss rate: " + (misses.toDouble / items))
+ for(i <- 0 until items)
+ assertEquals(map.get(key(i)), i.toLong)
+ map
}
}
object OffsetMapTest {
def main(args: Array[String]) {
- if(args.length != 1) {
- System.err.println("USAGE: java OffsetMapTest size")
+ if(args.length != 2) {
+ System.err.println("USAGE: java OffsetMapTest size load")
System.exit(1)
}
val test = new OffsetMapTest()
- test.validateMap(args(0).toInt)
+ val size = args(0).toInt
+ val load = args(1).toDouble
+ val start = System.nanoTime
+ val map = test.validateMap(size, load)
+ val ellapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
+ println(map.size + " entries in map of size " + map.slots + " in " + ellapsedMs + " ms")
+ println("Collision rate: %.1f%%".format(100*map.collisionRate))
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 883442d..4837585 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -38,7 +38,7 @@ class MessageTest extends JUnitSuite {
@Before
def setUp(): Unit = {
val keys = Array(null, "key".getBytes, "".getBytes)
- val vals = Array("value".getBytes, "".getBytes)
+ val vals = Array("value".getBytes, "".getBytes, null)
val codecs = Array(NoCompressionCodec, GZIPCompressionCodec)
for(k <- keys; v <- vals; codec <- codecs)
messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
@@ -47,7 +47,12 @@ class MessageTest extends JUnitSuite {
@Test
def testFieldValues {
for(v <- messages) {
- TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
+ if(v.payload == null) {
+ assertTrue(v.message.isNull)
+ assertEquals("Payload should be null", null, v.message.payload)
+ } else {
+ TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
+ }
assertEquals(Message.CurrentMagicValue, v.message.magic)
if(v.message.hasKey)
TestUtils.checkEquals(ByteBuffer.wrap(v.key), v.message.key)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index e4b057e..507e6a8 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -288,7 +288,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
case e: FailedToSendMessageException => /* success */
case e: Exception => fail("Not expected", e)
} finally {
- producer.close
+ producer.close()
}
val t2 = SystemTime.milliseconds
@@ -296,5 +296,28 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
// we do this because the DefaultEventHandler retries a number of times
assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
}
+
+ @Test
+ def testSendNullMessage() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+ props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+ try {
+
+ // create topic
+ AdminUtils.createTopic(zkClient, "new-topic", 2, 1)
+ assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
+ AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+
+ producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
+ } finally {
+ producer.close()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9ff4e8eb/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
new file mode 100644
index 0000000..02054c6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -0,0 +1,41 @@
+package kafka.utils
+
+import junit.framework.Assert._
+import org.scalatest.Assertions
+import org.junit.{Test, After, Before}
+
+class IteratorTemplateTest extends Assertions {
+
+ val lst = (0 until 10).toSeq
+ val iterator = new IteratorTemplate[Int]() {
+ var i = 0
+ override def makeNext() = {
+ if(i >= lst.size) {
+ allDone()
+ } else {
+ val item = lst(i)
+ i += 1
+ item
+ }
+ }
+ }
+
+ @Test
+ def testIterator() {
+ for(i <- 0 until 10) {
+ assertEquals("We should have an item to read.", true, iterator.hasNext)
+ assertEquals("Checking again shouldn't change anything.", true, iterator.hasNext)
+ assertEquals("Peeking at the item should show the right thing.", i, iterator.peek)
+ assertEquals("Peeking again shouldn't change anything", i, iterator.peek)
+ assertEquals("Getting the item should give the right thing.", i, iterator.next)
+ }
+ assertEquals("All gone!", false, iterator.hasNext)
+ intercept[NoSuchElementException] {
+ iterator.peek
+ }
+ intercept[NoSuchElementException] {
+ iterator.next
+ }
+ }
+
+}
\ No newline at end of file