You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC
[11/11] git commit: merge from 0.8 and resolve conflicts
Updated Branches:
refs/heads/trunk ed36a7f07 -> 9249b76d1
merge from 0.8 and resolve conflicts
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9249b76d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9249b76d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9249b76d
Branch: refs/heads/trunk
Commit: 9249b76d1d04f2583843cf5fe09ba8bbdf611183
Parents: ed36a7f a409531
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 11 18:25:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 11 18:25:56 2013 -0800
----------------------------------------------------------------------
config/consumer.properties | 2 +-
config/log4j.properties | 45 +++++++--
config/producer.properties | 25 +----
config/server.properties | 34 +++----
.../main/java/kafka/etl/impl/DataGenerator.java | 2 +-
.../kafka/bridge/hadoop/KafkaOutputFormat.java | 3 +-
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 5 +-
.../src/main/scala/kafka/api/ProducerRequest.scala | 1 -
.../main/scala/kafka/api/StopReplicaRequest.scala | 4 +-
.../scala/kafka/api/TopicMetadataRequest.scala | 20 ++--
core/src/main/scala/kafka/client/ClientUtils.scala | 15 ++-
core/src/main/scala/kafka/cluster/Partition.scala | 10 +-
.../scala/kafka/consumer/ConsoleConsumer.scala | 18 ++--
.../main/scala/kafka/consumer/ConsumerConfig.scala | 32 +++---
.../kafka/consumer/ConsumerFetcherThread.scala | 10 +-
.../consumer/ZookeeperConsumerConnector.scala | 20 ++--
.../controller/ControllerChannelManager.scala | 6 +-
.../scala/kafka/controller/KafkaController.scala | 4 +-
.../kafka/controller/PartitionStateMachine.scala | 5 +-
.../kafka/controller/ReplicaStateMachine.scala | 2 +-
.../scala/kafka/javaapi/TopicMetadataRequest.scala | 11 ++-
core/src/main/scala/kafka/log/FileMessageSet.scala | 5 +-
core/src/main/scala/kafka/log/Log.scala | 18 +++-
core/src/main/scala/kafka/log/LogManager.scala | 36 ++++----
core/src/main/scala/kafka/log/OffsetIndex.scala | 8 +-
.../main/scala/kafka/network/RequestChannel.scala | 11 ++-
.../main/scala/kafka/network/SocketServer.scala | 2 +-
.../scala/kafka/producer/BrokerPartitionInfo.scala | 8 +-
.../scala/kafka/producer/ConsoleProducer.scala | 61 +++++++++---
.../scala/kafka/producer/DefaultPartitioner.scala | 5 +-
.../scala/kafka/producer/KafkaLog4jAppender.scala | 4 +-
core/src/main/scala/kafka/producer/Producer.scala | 12 +-
.../main/scala/kafka/producer/ProducerConfig.scala | 29 ++++--
.../main/scala/kafka/producer/SyncProducer.scala | 2 +-
.../scala/kafka/producer/SyncProducerConfig.scala | 10 +-
.../kafka/producer/async/AsyncProducerConfig.scala | 8 +-
.../kafka/producer/async/DefaultEventHandler.scala | 76 ++++++++++-----
.../scala/kafka/server/AbstractFetcherThread.scala | 6 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 54 ++++++----
core/src/main/scala/kafka/server/KafkaConfig.scala | 74 +++++++-------
core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 6 +-
.../scala/kafka/tools/ConsumerOffsetChecker.scala | 65 ++++++-------
.../main/scala/kafka/tools/KafkaMigrationTool.java | 4 +-
.../main/scala/kafka/tools/ReplayLogProducer.scala | 14 ++--
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 +-
.../scala/other/kafka/TestEndToEndLatency.scala | 4 +-
.../scala/other/kafka/TestLogPerformance.scala | 2 +-
.../scala/other/kafka/TestZKConsumerOffsets.scala | 2 +-
.../api/RequestResponseSerializationTest.scala | 6 +-
.../kafka/integration/AutoOffsetResetTest.scala | 4 +-
.../integration/ProducerConsumerTestHarness.scala | 8 +-
.../unit/kafka/integration/TopicMetadataTest.scala | 33 +++++--
.../test/scala/unit/kafka/log/LogManagerTest.scala | 18 ++--
core/src/test/scala/unit/kafka/log/LogTest.scala | 56 ++++++------
.../unit/kafka/producer/AsyncProducerTest.scala | 20 ++--
.../scala/unit/kafka/producer/ProducerTest.scala | 16 ++--
.../unit/kafka/producer/SyncProducerTest.scala | 17 ++--
.../unit/kafka/server/ISRExpirationTest.scala | 10 +-
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../scala/unit/kafka/server/LogOffsetTest.scala | 6 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 8 +-
.../scala/unit/kafka/server/SimpleFetchTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 22 ++--
.../src/main/java/kafka/examples/Consumer.java | 8 +-
.../scala/kafka/perf/ConsumerPerformance.scala | 12 +-
.../scala/kafka/perf/ProducerPerformance.scala | 15 ++--
.../config/mirror_producer.properties | 4 +-
.../config/mirror_producer1.properties | 4 +-
.../config/mirror_producer2.properties | 4 +-
.../config/mirror_producer3.properties | 4 +-
.../config/server_source1.properties | 20 ++--
.../config/server_source2.properties | 20 ++--
.../config/server_source3.properties | 20 ++--
.../config/server_source4.properties | 20 ++--
.../config/server_target1.properties | 20 ++--
.../config/server_target2.properties | 20 ++--
.../config/server_target3.properties | 20 ++--
.../config/whitelisttest.consumer.properties | 6 +-
system_test/common/util.sh | 8 +-
.../config/migration_producer.properties | 29 +-----
.../config/server.properties | 36 ++++----
.../config/blacklisttest.consumer.properties | 6 +-
.../mirror_maker/config/mirror_producer.properties | 4 +-
.../config/server_source_1_1.properties | 20 ++--
.../config/server_source_1_2.properties | 20 ++--
.../config/server_source_2_1.properties | 20 ++--
.../config/server_source_2_2.properties | 20 ++--
.../config/server_target_1_1.properties | 20 ++--
.../config/server_target_1_2.properties | 20 ++--
.../config/whitelisttest_1.consumer.properties | 6 +-
.../config/whitelisttest_2.consumer.properties | 6 +-
.../config/mirror_consumer.properties | 20 ++--
.../config/mirror_producer.properties | 2 +-
.../config/server.properties | 56 ++++++------
system_test/producer_perf/config/server.properties | 20 ++--
.../replication_testsuite/config/server.properties | 56 ++++++------
98 files changed, 855 insertions(+), 759 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b813b12,42a9628..cca6a11
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@@ -112,14 -112,11 +112,14 @@@ private[kafka] class ZookeeperConsumerC
connectZk()
createFetcher()
- if (config.autoCommit) {
+ if (config.autoCommitEnable) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
- scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
- config.autoCommitIntervalMs, false)
+ scheduler.schedule("kafka-consumer-autocommit",
+ autoCommit,
+ delay = config.autoCommitIntervalMs,
+ period = config.autoCommitIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
}
KafkaMetricsReporter.startReporters(config.props)
@@@ -163,8 -160,8 +163,8 @@@
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
try {
- if (config.autoCommit)
+ if (config.autoCommitEnable)
- scheduler.shutdownNow()
+ scheduler.shutdown()
fetcher match {
case Some(f) => f.shutdown
case None =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/FileMessageSet.scala
index 5284026,5845bb6..37e8d87
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@@ -40,38 -36,28 +40,41 @@@ import kafka.metrics.{KafkaTimer, Kafka
@nonthreadsafe
class FileMessageSet private[kafka](val file: File,
private[log] val channel: FileChannel,
- private[log] val start: Int = 0,
- private[log] val limit: Int = Int.MaxValue,
- initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
+ private[log] val start: Int,
+ private[log] val end: Int,
+ isSlice: Boolean) extends MessageSet with Logging {
/* the size of the message set in bytes */
- private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
+ private val _size =
+ if(isSlice)
+ new AtomicInteger(end - start) // don't check the file size if this is just a slice view
+ else
+ new AtomicInteger(math.min(channel.size().toInt, end) - start)
- if (initChannelPositionToEnd) {
+ /* if this is not a slice, update the file pointer to the end of the file */
- if (!isSlice)
++ if (!isSlice) {
+ info("Creating or reloading log segment %s".format(file.getAbsolutePath))
+ /* set the file position to the last byte in the file */
channel.position(channel.size)
+ }
/**
- * Create a file message set with no limit or offset
+ * Create a file message set with no slicing.
*/
- def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
+ def this(file: File, channel: FileChannel) =
+ this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
/**
- * Create a file message set with no limit or offset
+ * Create a file message set with no slicing
*/
- def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
+ def this(file: File) =
+ this(file, Utils.openChannel(file, mutable = true))
+
+ /**
+ * Create a slice view of the file message set that begins and ends at the given byte offsets
+ */
+ def this(file: File, channel: FileChannel, start: Int, end: Int) =
+ this(file, channel, start, end, isSlice = true)
/**
* Return a message set which is a view into this set starting from the given position and with the given size limit.
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index 1654dbf,79db610..f6a32b6
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -366,52 -428,59 +366,58 @@@ class Log(val dir: File
def logEndOffset: Long = nextOffset.get
/**
- * Roll the log over if necessary
+ * Roll the log over to a new empty log segment if necessary
+ * @return The currently active segment after (perhaps) rolling to a new segment
*/
- private def maybeRoll(segment: LogSegment): LogSegment = {
- if(segment.messageSet.sizeInBytes > maxLogFileSize) {
+ private def maybeRoll(): LogSegment = {
+ val segment = activeSegment
- if ((segment.size > maxSegmentSize) ||
- (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) ||
- segment.index.isFull)
++ if (segment.size > maxSegmentSize) {
+ info("Rolling %s due to full data log".format(name))
roll()
- else
- } else if((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) {
++ } else if (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) {
+ info("Rolling %s due to time based rolling".format(name))
+ roll()
- } else if(segment.index.isFull) {
++ } else if (segment.index.isFull) {
+ info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
+ .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+ roll()
+ } else
segment
}
-
+
/**
- * Create a new segment and make it active, and return it
+ * Roll the log over to a new active segment starting with the current logEndOffset.
+ * This will trim the index to the exact size of the number of entries it currently contains.
+ * @return The newly rolled segment
*/
def roll(): LogSegment = {
lock synchronized {
- flush()
- rollToOffset(logEndOffset)
- }
- }
+ // flush the log to ensure that only the active segment needs to be recovered
+ if(!segments.isEmpty())
+ flush()
- /**
- * Roll the log over to the given new offset value
- */
- private def rollToOffset(newOffset: Long): LogSegment = {
- val logFile = logFilename(dir, newOffset)
- val indexFile = indexFilename(dir, newOffset)
- for(file <- List(logFile, indexFile); if file.exists) {
- warn("Newly rolled segment file " + file.getAbsolutePath + " already exists; deleting it first")
- file.delete()
- }
- info("Rolling log '" + name + "' to " + logFile.getAbsolutePath + " and " + indexFile.getAbsolutePath)
- segments.view.lastOption match {
- case Some(segment) => segment.index.trimToValidSize()
- case None =>
+ val newOffset = logEndOffset
+ val logFile = logFilename(dir, newOffset)
+ val indexFile = indexFilename(dir, newOffset)
+ for(file <- List(logFile, indexFile); if file.exists) {
+ warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+ file.delete()
+ }
+
- debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
++ info("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
+ segments.lastEntry() match {
+ case null =>
+ case entry => entry.getValue.index.trimToValidSize()
+ }
+ val segment = new LogSegment(dir,
+ startOffset = newOffset,
+ indexIntervalBytes = indexIntervalBytes,
+ maxIndexSize = maxIndexSize)
+ val prev = segments.put(segment.baseOffset, segment)
+ if(prev != null)
+ throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
+ segment
}
-
- val segmentsView = segments.view
- if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists".format(dir.getName, newOffset))
-
- val segment = new LogSegment(dir,
- startOffset = newOffset,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize)
- segments.append(segment)
- segment
}
/**
@@@ -439,34 -507,104 +445,34 @@@
}
}
- def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
- val segsArray = segments.view
- var offsetTimeArray: Array[(Long, Long)] = null
- if(segsArray.last.size > 0)
- offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
- else
- offsetTimeArray = new Array[(Long, Long)](segsArray.length)
-
- for(i <- 0 until segsArray.length)
- offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
- if(segsArray.last.size > 0)
- offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
-
- var startIndex = -1
- timestamp match {
- case OffsetRequest.LatestTime =>
- startIndex = offsetTimeArray.length - 1
- case OffsetRequest.EarliestTime =>
- startIndex = 0
- case _ =>
- var isFound = false
- debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
- startIndex = offsetTimeArray.length - 1
- while (startIndex >= 0 && !isFound) {
- if (offsetTimeArray(startIndex)._2 <= timestamp)
- isFound = true
- else
- startIndex -=1
- }
- }
-
- val retSize = maxNumOffsets.min(startIndex + 1)
- val ret = new Array[Long](retSize)
- for(j <- 0 until retSize) {
- ret(j) = offsetTimeArray(startIndex)._1
- startIndex -= 1
- }
- // ensure that the returned seq is in descending order of offsets
- ret.toSeq.sortBy(- _)
- }
-
+ /**
+ * Completely delete this log directory and all contents from the file system with no delay
+ */
def delete(): Unit = {
- deleteSegments(segments.contents.get())
+ logSegments.foreach(_.delete())
Utils.rm(dir)
}
-
+
-
- /* Attempts to delete all provided segments from a log and returns how many it was able to */
- def deleteSegments(segments: Seq[LogSegment]): Int = {
- var total = 0
- for(segment <- segments) {
- info("Deleting log segment " + segment.start + " from " + name)
- val deletedLog = segment.messageSet.delete()
- val deletedIndex = segment.index.delete()
- if(!deletedIndex || !deletedLog) {
- throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.")
- } else {
- total += 1
- }
- if(segment.messageSet.file.exists())
- error("Data log file %s still exists".format(segment.messageSet.file.getAbsolutePath))
- if(segment.index.file.exists())
- error("Index file %s still exists".format(segment.index.file.getAbsolutePath))
- }
- total
- }
-
+ /**
+ * Truncate this log so that it ends with the greatest offset < targetOffset.
+ * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
+ */
def truncateTo(targetOffset: Long) {
+ info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
+ if(targetOffset > logEndOffset) {
+ info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
+ return
+ }
lock synchronized {
- val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
- val viewSize = segments.view.size
- val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
- /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
- if(numSegmentsDeleted != segmentsToBeDeleted.size)
- error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
- if (numSegmentsDeleted == viewSize) {
- segments.trunc(segments.view.size)
- rollToOffset(targetOffset)
- this.nextOffset.set(targetOffset)
+ if(segments.firstEntry.getValue.baseOffset > targetOffset) {
+ truncateFullyAndStartAt(targetOffset)
} else {
- if(targetOffset > logEndOffset) {
- error("Target offset %d cannot be greater than the last message offset %d in the log %s".
- format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
- } else {
- // find the log segment that has this hw
- val segmentToBeTruncated = findRange(segments.view, targetOffset)
- segmentToBeTruncated match {
- case Some(segment) =>
- val truncatedSegmentIndex = segments.view.indexOf(segment)
- segments.truncLast(truncatedSegmentIndex)
- segment.truncateTo(targetOffset)
- this.nextOffset.set(targetOffset)
- info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
- case None => // nothing to do
- }
- }
+ val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
+ deletable.foreach(deleteSegment(_))
+ activeSegment.truncateTo(targetOffset)
+ this.nextOffset.set(targetOffset)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogManager.scala
index 2ea9afe,497cfdd..c5ab8a2
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@@ -42,17 -42,16 +42,17 @@@ class LogManager(val config: KafkaConfi
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
+ val InitialTaskDelayMs = 30*1000
val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
- private val logFileSizeMap = config.logFileSizeMap
- private val logFlushInterval = config.flushInterval
- private val logFlushIntervals = config.flushIntervalMap
+ private val logFileSizeMap = config.logSegmentBytesPerTopicMap
+ private val logFlushInterval = config.logFlushIntervalMessages
+ private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
private val logCreationLock = new Object
- private val logRetentionSizeMap = config.logRetentionSizeMap
- private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
- private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+ private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
+ private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+ private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
- private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+ private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@@@ -115,18 -111,17 +115,18 @@@
info("Loading log '" + dir.getName + "'")
val topicPartition = parseTopicPartitionName(dir.getName)
val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+ val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
- val log = new Log(dir,
- maxLogFileSize,
+ val log = new Log(dir,
+ scheduler,
- maxLogFileSize,
- config.maxMessageSize,
++ maxLogFileSize,
+ config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery,
- config.logIndexMaxSizeBytes,
+ config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
- time,
- config.brokerId)
+ config.logDeleteDelayMs,
+ time)
val previous = this.logs.put(topicPartition, log)
if(previous != null)
throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
@@@ -142,19 -137,12 +142,19 @@@
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
- info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
- scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
- info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
- " ms with the following overrides " + logFlushIntervals)
- scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
- config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
+ info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
+ scheduler.schedule("kafka-log-cleaner",
+ cleanupLogs,
+ delay = InitialTaskDelayMs,
+ period = logCleanupIntervalMs,
+ TimeUnit.MILLISECONDS)
+ info("Starting log flusher with a default period of %d ms with the following overrides: %s."
- .format(config.defaultFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
++ .format(config.logFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
+ scheduler.schedule("kafka-log-flusher",
+ flushDirtyLogs,
+ delay = InitialTaskDelayMs,
- period = config.flushSchedulerThreadRate,
++ period = config.logFlushSchedulerIntervalMs,
+ TimeUnit.MILLISECONDS)
}
}
@@@ -198,18 -186,17 +198,18 @@@
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
- val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+ val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
log = new Log(dir,
+ scheduler,
maxLogFileSize,
- config.maxMessageSize,
+ config.messageMaxBytes,
logFlushInterval,
rollIntervalMs,
needsRecovery = false,
- config.logIndexMaxSizeBytes,
+ config.logIndexSizeMaxBytes,
config.logIndexIntervalBytes,
- time,
- config.brokerId)
+ config.logDeleteDelayMs,
+ time)
info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
log
@@@ -252,9 -249,8 +252,9 @@@
*/
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
val topic = parseTopicPartitionName(log.dir.getName).topic
- val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+ val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
- if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
+ if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
+ return 0
var diff = log.size - maxLogRetentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
@@@ -309,15 -307,14 +309,15 @@@
*/
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
- for (log <- allLogs) {
+ for ((topicAndPartition, log) <- logs) {
try {
- val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
+ val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
+
- var logFlushInterval = config.defaultFlushIntervalMs
+ var logFlushInterval = config.logFlushIntervalMs
- if(logFlushIntervals.contains(log.topicName))
- logFlushInterval = logFlushIntervals(log.topicName)
- debug(log.topicName + " flush interval " + logFlushInterval +
- " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
+ if(logFlushIntervals.contains(topicAndPartition.topic))
+ logFlushInterval = logFlushIntervals(topicAndPartition.topic)
+ debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + logFlushInterval +
+ " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= logFlushInterval)
log.flush
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/OffsetIndex.scala
index 0e18f28,43b3575..1662f10
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@@ -83,18 -83,19 +83,21 @@@ class OffsetIndex(val file: File, val b
Utils.swallow(raf.close())
}
}
-
+
- /* the number of entries in the index */
- private var size = new AtomicInteger(mmap.position / 8)
-
- /* the last offset in the index */
- var lastOffset = readLastOffset()
-
+ info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+ .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+
- /* the maximum number of entries this index can hold */
+ /**
+ * The maximum number of eight-byte entries this index can hold
+ */
def maxEntries = mmap.limit / 8
+
+ /* the number of eight-byte entries currently in the index */
+ private var size = new AtomicInteger(mmap.position / 8)
+
+ /* the last offset in the index */
+ var lastOffset = readLastOffset()
-
+
/**
* The last offset written to the index
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index 4283973,60752fb..a166a1c
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -21,8 -21,6 +21,8 @@@ import kafka.admin.{CreateTopicCommand
import kafka.api._
import kafka.message._
import kafka.network._
+import kafka.log._
- import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs}
++import kafka.utils.ZKGroupTopicDirs
import org.apache.log4j.Logger
import scala.collection._
import kafka.network.RequestChannel.Response
@@@ -39,13 -38,12 +40,13 @@@ import kafka.utils.{ZkUtils, Pool, Syst
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
- brokerId: Int) extends Logging {
+ val brokerId: Int,
+ val config: KafkaConfig) extends Logging {
private val producerRequestPurgatory =
- new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+ new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
private val fetchRequestPurgatory =
- new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
+ new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
private val delayedRequestMetrics = new DelayedRequestMetrics
private val requestLogger = Logger.getLogger("kafka.request.logger")
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaConfig.scala
index ccc35d3,f65db33..51ea727
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@@ -36,22 -36,19 +36,22 @@@ class KafkaConfig private (val props: V
/*********** General Configuration ***********/
/* the broker id for this server */
- val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
+ val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
/* the maximum size of message that the server can receive */
- val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
+ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
/* the number of network threads that the server uses for handling network requests */
- val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+ val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
/* the number of io threads that the server uses for carrying out network requests */
- val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+ val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
+ /* the number of threads to use for various background processing tasks */
+ val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue))
+
/* the number of queued requests allowed before blocking the network threads */
- val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+ val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
/*********** Socket Server Configuration ***********/
@@@ -114,21 -111,19 +114,21 @@@
val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
/* the number of messages accumulated on a log partition before messages are flushed to disk */
- val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
-
+ val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+
+ val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
+
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
- val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+ val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
/* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
- val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000)
+ val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms", 3000)
/* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
- val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
+ val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
/* enable auto creation of topic on the server */
- val autoCreateTopics = props.getBoolean("auto.create.topics", true)
+ val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
/*********** Replication configuration ***********/
@@@ -162,20 -159,15 +164,20 @@@
/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
- val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+ val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
- /* the frequency with which the highwater mark is saved out to disk */
- val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
+ /* the frequency with which the high watermark is saved out to disk */
+ val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
/* the purge interval (in number of requests) of the fetch request purgatory */
- val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+ val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
/* the purge interval (in number of requests) of the producer request purgatory */
- val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
+ /*********** Misc configuration ***********/
+
+ /* the maximum size for a metadata entry associated with an offset commit */
+ val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 6a8213c,064af6b..7810c21
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -72,7 -72,7 +72,7 @@@ class ReplicaManager(val config: KafkaC
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
- scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.highWaterMarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
- kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
++ scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
/**
@@@ -91,7 -91,7 +91,7 @@@
def startup() {
// start ISR expiration thread
- scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaMaxLagTimeMs, unit = TimeUnit.MILLISECONDS)
- kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
++ scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/other/kafka/TestLogPerformance.scala
index 67fb168,9f3bb40..a7b661a
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@@ -33,8 -33,7 +33,8 @@@ object TestLogPerformance
val props = TestUtils.createBrokerConfig(0, -1)
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
+ val scheduler = new KafkaScheduler(1)
- val log = new Log(dir, scheduler, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
++ val log = new Log(dir, scheduler, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7fc154f,ce893bf..f48c709
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@@ -39,11 -40,12 +39,11 @@@ class LogManagerTest extends JUnit3Suit
override def setUp() {
super.setUp()
config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
- override val logFileSize = 1024
- override val flushInterval = 10000
+ override val logSegmentBytes = 1024
+ override val logFlushIntervalMessages = 10000
override val logRetentionHours = maxLogAgeHours
}
- scheduler.startup
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
logDir = logManager.logDirs(0)
}
@@@ -119,11 -112,15 +119,11 @@@
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
- override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
+ override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
+ override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
- override val logRetentionHours = retentionHours
- override val logFlushIntervalMessages = 100
override val logRollHours = maxRollInterval
}
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
// create a log
@@@ -131,21 -128,20 +131,21 @@@
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
- for(i <- 0 until 1000) {
+ val numMessages = 200
+ for(i <- 0 until numMessages) {
val set = TestUtils.singleMessageSet("test".getBytes())
- val (start, end) = log.append(set)
- offset = start
+ val info = log.append(set)
+ offset = info.firstOffset
}
- // flush to make sure it's written to disk
- log.flush
// should be exactly 100 full segments + 1 new empty one
- assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logFileSize, log.numberOfSegments)
- assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
++ assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logSegmentBytes, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
- logManager.cleanupLogs()
+ time.sleep(logManager.InitialTaskDelayMs)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
+ time.sleep(log.segmentDeleteDelayMs + 1)
+ assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
try {
log.read(0, 1024)
@@@ -165,14 -158,15 +165,14 @@@
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val flushSchedulerThreadRate = 1000
- override val defaultFlushIntervalMs = 1000
- override val flushInterval = Int.MaxValue
- override val logSegmentBytes = 1024 *1024 *1024
- override val logFlushSchedulerIntervalMs = 50
++ override val logFlushSchedulerIntervalMs = 1000
++ override val logFlushIntervalMs = 1000
+ override val logFlushIntervalMessages = Int.MaxValue
- override val logRollHours = maxRollInterval
- override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
}
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
+ val lastFlush = log.lastFlushTime
for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
@@@ -191,9 -183,9 +191,9 @@@
val dirs = Seq(TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath,
TestUtils.tempDir().getAbsolutePath)
- props.put("log.directories", dirs.mkString(","))
+ props.put("log.dirs", dirs.mkString(","))
logManager.shutdown()
- logManager = new LogManager(new KafkaConfig(props), scheduler, time)
+ logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala
index 109474c,786ae03..0fc74fa
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@@ -65,8 -62,8 +65,8 @@@ class LogTest extends JUnitSuite
val time: MockTime = new MockTime()
// create a log
- val log = new Log(logDir, time.scheduler, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
- val log = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
- time.currentMs += rollMs + 1
++ val log = new Log(logDir, time.scheduler, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
+ time.sleep(rollMs + 1)
// segment age is less than its limit
log.append(set)
@@@ -98,7 -96,7 +98,7 @@@
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
- val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@@ -114,78 -109,29 +114,78 @@@
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ log.append(TestUtils.singleMessageSet("test".getBytes))
}
+ /**
+ * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
+ */
@Test
- def testAppendAndRead() {
- val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val message = new Message(Integer.toString(42).getBytes())
- for(i <- 0 until 10)
- log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
- log.flush()
- val messages = log.read(0, 1024)
- var current = 0
- for(curr <- messages) {
- assertEquals("Read message should equal written", message, curr.message)
- current += 1
+ def testAppendAndReadWithSequentialOffsets() {
- val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
+
+ for(i <- 0 until messages.length)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
+ for(i <- 0 until messages.length) {
+ val read = log.read(i, 100, Some(i+1)).head
+ assertEquals("Offset read should match order appended.", i, read.offset)
+ assertEquals("Message should match appended.", messages(i), read.message)
}
- assertEquals(10, current)
+ assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
}
-
+
+ /**
+ * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
+ * from any offset less than the logEndOffset including offsets not appended.
+ */
+ @Test
+ def testAppendAndReadWithNonSequentialOffsets() {
- val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
+ val messages = messageIds.map(id => new Message(id.toString.getBytes))
+
+ // now test the case that we give the offsets and use non-sequential offsets
+ for(i <- 0 until messages.length)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
+ for(i <- 50 until messageIds.max) {
+ val idx = messageIds.indexWhere(_ >= i)
+ val read = log.read(i, 100, None).head
+ assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
+ assertEquals("Message should match appended.", messages(idx), read.message)
+ }
+ }
+
+ /**
+ * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
+ * Specifically we create a log where the last message in the first segment has offset 0. If we
+ * then read offset 1, we should expect this read to come from the second segment, even though the
+ * first segment has the greatest lower bound on the offset.
+ */
+ @Test
+ def testReadAtLogGap() {
- val log = new Log(logDir, time.scheduler, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 300, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+
+ // keep appending until we have two segments with only a single message in the second segment
+ while(log.numberOfSegments == 1)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+
+ // now manually truncate off all but one message from the first segment to create a gap in the messages
+ log.logSegments.head.truncateTo(1)
+
+ assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
+ }
+
+ /**
+ * Test reading at the boundary of the log, specifically
+ * - reading from the logEndOffset should give an empty message set
+ * - reading beyond the log end offset should throw an OffsetOutOfRangeException
+ */
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@@ -208,10 -151,10 +208,10 @@@
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
- val offsets = messageSets.map(log.append(_)._1)
+ val offsets = messageSets.map(log.append(_).firstOffset)
log.flush
/* do successive reads to ensure all our messages are there */
@@@ -232,8 -173,8 +232,8 @@@
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
- val log = new Log(logDir, time.scheduler, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-
- val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-
++ val log = new Log(logDir, time.scheduler, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
@@@ -246,46 -187,66 +246,46 @@@
assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
}
-
- @Test
- def testFindSegment() {
- assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
- assertEquals("Search in segment list just outside the range of the last segment should find last segment",
- 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
- assertEquals("Search in segment list far outside the range of the last segment should find last segment",
- 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
- assertEquals("Search in segment list far outside the range of the last segment should find last segment",
- None, Log.findRange(makeRanges(5, 9, 12), -1))
- assertContains(makeRanges(5, 9, 12), 11)
- assertContains(makeRanges(5), 4)
- assertContains(makeRanges(5,8), 5)
- assertContains(makeRanges(5,8), 6)
- }
+ /**
+ * Test garbage collecting old segments
+ */
@Test
- def testEdgeLogRollsStartingAtZero() {
- // first test a log segment starting at 0
- val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val curOffset = log.logEndOffset
- assertEquals(curOffset, 0)
-
- // time goes by; the log file is deleted
- log.markDeletedWhile(_ => true)
-
- // we now have a new log; the starting offset of the new log should remain 0
- assertEquals(curOffset, log.logEndOffset)
- log.delete()
- }
-
- @Test
- def testEdgeLogRollsStartingAtNonZero() {
- // second test an empty log segment starting at non-zero
- val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
- val numMessages = 1
- for(i <- 0 until numMessages)
- log.append(TestUtils.singleMessageSet(i.toString.getBytes))
- val curOffset = log.logEndOffset
-
- // time goes by; the log file is deleted
- log.markDeletedWhile(_ => true)
-
- // we now have a new log
- assertEquals(curOffset, log.logEndOffset)
-
- // time goes by; the log file (which is empty) is deleted again
- val deletedSegments = log.markDeletedWhile(_ => true)
-
- // we shouldn't delete the last empty log segment.
- assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
-
- // we now have a new log
- assertEquals(curOffset, log.logEndOffset)
+ def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
+ for(messagesToAppend <- List(0, 1, 25)) {
+ logDir.mkdirs()
+ // first test a log segment starting at 0
- val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ for(i <- 0 until messagesToAppend)
+ log.append(TestUtils.singleMessageSet(i.toString.getBytes))
+
+ var currOffset = log.logEndOffset
+ assertEquals(currOffset, messagesToAppend)
+
+ // time goes by; the log file is deleted
+ log.deleteOldSegments(_ => true)
+
+ assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
+ assertEquals("We should still have one segment left", 1, log.numberOfSegments)
+ assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
+ assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
+ assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
+ currOffset,
+ log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
+
+ // cleanup the log
+ log.delete()
+ }
}
+ /**
+ * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the
+ * setting and checking that an exception is thrown.
+ */
@Test
def testMessageSizeCheck() {
- val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
- val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+ val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
+ val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
@@@ -311,7 -269,7 +311,7 @@@
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
- var log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
- var log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++ var log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@@ -320,42 -278,17 +320,42 @@@
log.close()
// test non-recovery case
- log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
- log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++ log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
- assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
- assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+ assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+ assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
- // test
- log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ // test recovery case
- log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++ log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
- assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
- assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
+ assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+ assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+ log.close()
+ }
+
+ /**
+ * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
+ */
+ @Test
+ def testIndexRebuild() {
+ // publish the messages and close the log
+ val numMessages = 200
- var log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
++ var log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+ for(i <- 0 until numMessages)
+ log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+ val indexFiles = log.logSegments.map(_.index.file)
+ log.close()
+
+ // delete all the index files
+ indexFiles.foreach(_.delete())
+
+ // reopen the log
- log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
++ log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+
+ assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
+ for(i <- 0 until numMessages)
+ assertEquals(i, log.read(i, 100, None).head.offset)
log.close()
}
@@@ -370,7 -300,7 +370,7 @@@
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
- val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@@@ -422,7 -349,7 +422,7 @@@
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
- val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
- val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++ val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
@@@ -438,7 -365,33 +438,7 @@@
log.append(set)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
}
-
+
-
- @Test
- def testAppendWithoutOffsetAssignment() {
- for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
- logDir.mkdir()
- var log = new Log(logDir,
- maxLogFileSize = 64*1024,
- maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
- indexIntervalBytes = 10000,
- needsRecovery = true)
- val messages = List("one", "two", "three", "four", "five", "six")
- val ms = new ByteBufferMessageSet(compressionCodec = codec,
- offsetCounter = new AtomicLong(5),
- messages = messages.map(s => new Message(s.getBytes)):_*)
- val firstOffset = ms.shallowIterator.toList.head.offset
- val lastOffset = ms.shallowIterator.toList.last.offset
- val (first, last) = log.append(ms, assignOffsets = false)
- assertEquals(last + 1, log.logEndOffset)
- assertEquals(firstOffset, first)
- assertEquals(lastOffset, last)
- assertTrue(log.read(5, 64*1024).size > 0)
- log.delete()
- }
- }
-
/**
* When we open a log any index segments without an associated log segment should be deleted.
*/
@@@ -449,10 -402,9 +449,10 @@@
val set = TestUtils.singleMessageSet("test".getBytes())
val log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
- maxIndexSize = 1000,
+ maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
++ maxIndexSize = 1000,
indexIntervalBytes = 1,
needsRecovery = false)
@@@ -475,10 -424,9 +475,10 @@@
// create a log
var log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
- maxIndexSize = 1000,
+ maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
++ maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)
@@@ -487,10 -435,9 +487,10 @@@
log.append(set)
log.close()
log = new Log(logDir,
- maxLogFileSize = set.sizeInBytes * 5,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
- maxIndexSize = 1000,
+ maxMessageSize = config.messageMaxBytes,
- maxIndexSize = 1000,
++ maxIndexSize = 1000,
indexIntervalBytes = 10000,
needsRecovery = true)
log.truncateTo(3)
@@@ -498,68 -445,24 +498,68 @@@
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
- def assertContains(ranges: Array[Range], offset: Long) = {
- Log.findRange(ranges, offset) match {
- case Some(range) =>
- assertTrue(range + " does not contain " + offset, range.contains(offset))
- case None => fail("No range found, but expected to find " + offset)
- }
+ /**
+ * Test that deleted files are deleted after the appropriate time.
+ */
+ @Test
+ def testAsyncDelete() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val asyncDeleteMs = 1000
+ val log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
++ maxMessageSize = config.messageMaxBytes,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ segmentDeleteDelayMs = asyncDeleteMs,
+ needsRecovery = true)
+
+ // append some messages to create some segments
+ for(i <- 0 until 100)
+ log.append(set)
+
+ // files should be renamed
+ val segments = log.logSegments.toArray
+ log.deleteOldSegments((s) => true)
+
+ assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
+ val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
+ assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
+
+ // when enough time passes the files should be deleted
+ time.sleep(asyncDeleteMs + 1)
+ assertTrue("Files should all be gone.", renamed.forall(!_.exists))
}
- class SimpleRange(val start: Long, val size: Long) extends Range
-
- def makeRanges(breaks: Int*): Array[Range] = {
- val list = new ArrayList[Range]
- var prior = 0
- for(brk <- breaks) {
- list.add(new SimpleRange(prior, brk - prior))
- prior = brk
- }
- list.toArray(new Array[Range](list.size))
+ /**
+ * Any files ending in .deleted should be removed when the log is re-opened.
+ */
+ @Test
+ def testOpenDeletesObsoleteFiles() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ var log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
++ maxMessageSize = config.messageMaxBytes,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ needsRecovery = false)
+
+ // append some messages to create some segments
+ for(i <- 0 until 100)
+ log.append(set)
+
+ log.deleteOldSegments((s) => true)
+ log.close()
+
+ log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
- maxMessageSize = config.maxMessageSize,
++ maxMessageSize = config.messageMaxBytes,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ needsRecovery = false)
+ assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f69b379,0000000..e3752cb
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@@ -1,219 -1,0 +1,219 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import kafka.utils._
+import junit.framework.Assert._
+import java.util.{Random, Properties}
+import kafka.consumer.SimpleConsumer
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
+import kafka.utils.TestUtils._
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils.nonthreadsafe
+import kafka.utils.threadsafe
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val random = new Random()
+ var logDir: File = null
+ var topicLogDir: File = null
+ var server: KafkaServer = null
+ var logSize: Int = 100
+ val brokerPort: Int = 9099
+ var simpleConsumer: SimpleConsumer = null
+ var time: Time = new MockTime()
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ val config: Properties = createBrokerConfig(1, brokerPort)
+ val logDirPath = config.getProperty("log.dir")
+ logDir = new File(logDirPath)
+ time = new MockTime()
+ server = TestUtils.createServer(new KafkaConfig(config), time)
+ simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
+ }
+
+ @After
+ override def tearDown() {
+ simpleConsumer.close
+ server.shutdown
+ Utils.rm(logDir)
+ super.tearDown()
+ }
+
+ @Test
+ def testGetOffsetsForUnknownTopic() {
+ val topicAndPartition = TopicAndPartition("foo", 0)
+ val request = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
+ val offsetResponse = simpleConsumer.getOffsetsBefore(request)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+ offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
+ }
+
+ @Test
+ def testGetOffsetsBeforeLatestTime() {
+ val topicPartition = "kafka-" + 0
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(
+ Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+ replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+
+ // try to fetch using latest offset
+ val fetchResponse = simpleConsumer.fetch(
+ new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
+ assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
+ }
+
+ @Test
+ def testEmptyLogsGetOffsets() {
+ val topicPartition = "kafka-" + random.nextInt(10)
+ val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+ topicLogDir = new File(topicPartitionPath)
+ topicLogDir.mkdir
+
+ val topic = topicPartition.split("-").head
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+
+ var offsetChanged = false
+ for(i <- 1 to 14) {
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+
+ if(consumerOffsets(0) == 1) {
+ offsetChanged = true
+ }
+ }
+ assertFalse(offsetChanged)
+ }
+
+ @Test
+ def testGetOffsetsBeforeNow() {
+ val topicPartition = "kafka-" + random.nextInt(3)
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
+ }
+
+ @Test
+ def testGetOffsetsBeforeEarliestTime() {
+ val topicPartition = "kafka-" + random.nextInt(3)
+ val topic = topicPartition.split("-").head
+ val part = Integer.valueOf(topicPartition.split("-").last).intValue
+
+ // setup brokers in zookeeper as owners of partitions for this test
+ CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
+
+ val logManager = server.getLogManager
+ val log = logManager.getOrCreateLog(topic, part)
+ val message = new Message(Integer.toString(42).getBytes())
+ for(i <- 0 until 20)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+ log.flush()
+
+ val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10)
+
+ assertEquals(Seq(0L), offsets)
+
+ waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
+ val topicAndPartition = TopicAndPartition(topic, part)
+ val offsetRequest =
+ OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
+ val consumerOffsets =
+ simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+ assertEquals(Seq(0L), consumerOffsets)
+ }
+
+ private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ val props = new Properties
- props.put("brokerid", nodeId.toString)
++ props.put("broker.id", nodeId.toString)
+ props.put("port", port.toString)
+ props.put("log.dir", getLogDir.getAbsolutePath)
- props.put("log.flush.interval", "1")
++ props.put("log.flush.interval.messages", "1")
+ props.put("enable.zookeeper", "false")
+ props.put("num.partitions", "20")
+ props.put("log.retention.hours", "10")
+ props.put("log.cleanup.interval.mins", "5")
- props.put("log.file.size", logSize.toString)
++ props.put("log.segment.bytes", logSize.toString)
+ props.put("zk.connect", zkConnect.toString)
+ props
+ }
+
+ private def getLogDir(): File = {
+ val dir = TestUtils.tempDir()
+ dir
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d46d47e,9400328..5547d63
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@@ -123,10 -123,11 +123,10 @@@ object TestUtils extends Logging
*/
def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
- props.put("brokerid", nodeId.toString)
- props.put("hostname", "localhost")
+ props.put("broker.id", nodeId.toString)
+ props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
- props.put("log.flush.interval.messages", "1")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
props.put("replica.socket.timeout.ms", "1500")
props
http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------