You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/19 16:52:44 UTC
kafka git commit: kafka-1646;
Improve consumer read performance for Windows; patched by Honghai Chen;
reviewed by Jay Kreps and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 19c98cb8e -> ca758252c
kafka-1646; Improve consumer read performance for Windows; patched by Honghai Chen; reviewed by Jay Kreps and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca758252
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca758252
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca758252
Branch: refs/heads/trunk
Commit: ca758252c5a524fe6135a585282dd4bf747afef2
Parents: 19c98cb
Author: Honghai Chen <wa...@163.com>
Authored: Fri Jun 19 07:52:37 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jun 19 07:52:37 2015 -0700
----------------------------------------------------------------------
.../main/scala/kafka/log/FileMessageSet.scala | 58 +++++++++++++++++++-
core/src/main/scala/kafka/log/Log.scala | 30 ++++++++--
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
core/src/main/scala/kafka/log/LogConfig.scala | 6 ++
core/src/main/scala/kafka/log/LogSegment.scala | 4 +-
.../main/scala/kafka/server/KafkaConfig.scala | 5 ++
.../main/scala/kafka/server/KafkaServer.scala | 1 +
core/src/main/scala/kafka/utils/CoreUtils.scala | 10 ----
.../unit/kafka/log/FileMessageSetTest.scala | 55 ++++++++++++++++++-
.../scala/unit/kafka/log/LogConfigTest.scala | 1 +
.../scala/unit/kafka/log/LogSegmentTest.scala | 54 +++++++++++++++++-
.../kafka/server/KafkaConfigConfigDefTest.scala | 1 +
12 files changed, 204 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 2522604..39361fe 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
/* if this is not a slice, update the file pointer to the end of the file */
if (!isSlice)
/* set the file position to the last byte in the file */
- channel.position(channel.size)
+ channel.position(math.min(channel.size().toInt, end))
/**
* Create a file message set with no slicing.
@@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File,
* Create a file message set with no slicing
*/
def this(file: File) =
- this(file, CoreUtils.openChannel(file, mutable = true))
+ this(file, FileMessageSet.openChannel(file, mutable = true))
+
+ /**
+ * Create a file message set with no slicing, and with initFileSize and preallocate.
+ * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+ * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
+ * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue.
+ */
+ def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
+ this(file,
+ channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
+ start = 0,
+ end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
+ isSlice = false)
/**
* Create a file message set with mutable option
*/
- def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable))
+ def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
/**
* Create a slice view of the file message set that begins and ends at the given byte offsets
@@ -223,10 +236,18 @@ class FileMessageSet private[kafka](@volatile var file: File,
*/
def close() {
flush()
+ trim()
channel.close()
}
/**
+ * Trim file when close or roll to next file
+ */
+ def trim() {
+ truncateTo(sizeInBytes())
+ }
+
+ /**
* Delete this message set from the filesystem
* @return True iff this message set was deleted.
*/
@@ -272,6 +293,37 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
}
+
+object FileMessageSet
+{
+ /**
+ * Open a channel for the given file
+ * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+ * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
+ * @param file File path
+ * @param mutable mutable
+ * @param fileAlreadyExists File already exists or not
+ * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
+ * @param preallocate Pre allocate file or not, gotten from configuration.
+ */
+ def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
+ if (mutable) {
+ if (fileAlreadyExists)
+ new RandomAccessFile(file, "rw").getChannel()
+ else {
+ if (preallocate) {
+ val randomAccessFile = new RandomAccessFile(file, "rw")
+ randomAccessFile.setLength(initFileSize)
+ randomAccessFile.getChannel()
+ }
+ else
+ new RandomAccessFile(file, "rw").getChannel()
+ }
+ }
+ else
+ new FileInputStream(file).getChannel()
+ }
+}
object LogFlushStats extends KafkaMetricsGroup {
val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 84e7b8f..6b9274d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -78,6 +78,13 @@ class Log(val dir: File,
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)
+ def initFileSize() : Int = {
+ if (config.preallocate)
+ config.segmentSize
+ else
+ 0
+ }
+
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
@@ -168,7 +175,8 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
- time = time)
+ time = time,
+ fileAlreadyExists = true)
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
@@ -205,7 +213,10 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
- time = time))
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = this.initFileSize(),
+ preallocate = config.preallocate))
} else {
recoverLog()
// reset the index size of the currently active log segment to allow more entries
@@ -586,14 +597,20 @@ class Log(val dir: File,
segments.lastEntry() match {
case null =>
- case entry => entry.getValue.index.trimToValidSize()
+ case entry => {
+ entry.getValue.index.trimToValidSize()
+ entry.getValue.log.trim()
+ }
}
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
- time = time)
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = initFileSize,
+ preallocate = config.preallocate)
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
@@ -687,7 +704,10 @@ class Log(val dir: File,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
- time = time))
+ time = time,
+ fileAlreadyExists = false,
+ initFileSize = initFileSize,
+ preallocate = config.preallocate))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c9ade72..d07a391 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -344,7 +344,7 @@ private[log] class Cleaner(val id: Int,
logFile.delete()
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
- val messages = new FileMessageSet(logFile)
+ val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index e9af221..fc41132 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -43,6 +43,7 @@ object Defaults {
val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
+ val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
}
case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
@@ -64,6 +65,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
+ val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -95,6 +97,7 @@ object LogConfig {
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CompressionTypeProp = "compression.type"
+ val PreAllocateEnableProp = "preallocate"
val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@@ -118,6 +121,7 @@ object LogConfig {
val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
"standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
"no compression; and 'producer' which means retain the original compression codec set by the producer."
+ val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
private val configDef = {
import ConfigDef.Range._
@@ -149,6 +153,8 @@ object LogConfig {
MEDIUM, UncleanLeaderElectionEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
+ .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
+ MEDIUM, PreAllocateEnableDoc)
}
def apply(): LogConfig = LogConfig(new Properties())
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index ed03953..1377e8f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0
- def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
- this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
+ def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
+ this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e0b2480..c1f0cca 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -83,6 +83,7 @@ object Defaults {
val LogDeleteDelayMs = 60000
val LogFlushSchedulerIntervalMs = Long.MaxValue
val LogFlushOffsetCheckpointIntervalMs = 60000
+ val LogPreAllocateEnable = false
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
@@ -206,6 +207,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+ val LogPreAllocateProp = "log.preallocate"
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
@@ -332,6 +334,7 @@ object KafkaConfig {
val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk"
val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+ val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)"
@@ -466,6 +469,7 @@ object KafkaConfig {
.define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
.define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false)
.define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+ .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
@@ -609,6 +613,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+ val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9de2a6f..52dc728 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -443,6 +443,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
+ case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
case _ => // we just leave those out
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index f5d704c..168a18d 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -70,16 +70,6 @@ object CoreUtils extends Logging {
Utils.daemonThread(name, runnable(fun))
/**
- * Open a channel for the given file
- */
- def openChannel(file: File, mutable: Boolean): FileChannel = {
- if(mutable)
- new RandomAccessFile(file, "rw").getChannel()
- else
- new FileInputStream(file).getChannel()
- }
-
- /**
* Do the given action and log any exceptions thrown without rethrowing them
* @param log The log method to use for logging. E.g. logger.warn
* @param action The action to execute
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index cec1cae..02cf668 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -17,6 +17,7 @@
package kafka.log
+import java.io._
import java.nio._
import java.util.concurrent.atomic._
import junit.framework.Assert._
@@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
assertEquals(List(message), messageSet.toList)
assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
}
-
+
+ /**
+ * Test the new FileMessageSet with pre allocate as true
+ */
+ @Test
+ def testPreallocateTrue() {
+ val temp = tempFile()
+ val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
+ val position = set.channel.position
+ val size = set.sizeInBytes()
+ assertEquals(0, position)
+ assertEquals(0, size)
+ assertEquals(512 *1024 *1024, temp.length)
+ }
+
+ /**
+ * Test the new FileMessageSet with pre allocate as false
+ */
+ @Test
+ def testPreallocateFalse() {
+ val temp = tempFile()
+ val set = new FileMessageSet(temp, false, 512 *1024 *1024, false)
+ val position = set.channel.position
+ val size = set.sizeInBytes()
+ assertEquals(0, position)
+ assertEquals(0, size)
+ assertEquals(0, temp.length)
+ }
+
+ /**
+ * Test the new FileMessageSet with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
+ */
+ @Test
+ def testPreallocateClearShutdown() {
+ val temp = tempFile()
+ val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
+ set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+ val oldposition = set.channel.position
+ val oldsize = set.sizeInBytes()
+ assertEquals(messageSet.sizeInBytes, oldposition)
+ assertEquals(messageSet.sizeInBytes, oldsize)
+ set.close()
+
+ val tempReopen = new File(temp.getAbsolutePath())
+ val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true)
+ val position = setReopen.channel.position
+ val size = setReopen.sizeInBytes()
+
+ assertEquals(oldposition, position)
+ assertEquals(oldposition, size)
+ assertEquals(oldposition, tempReopen.length)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index c31f884..19dcb47 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -45,6 +45,7 @@ class LogConfigTest extends JUnit3Suite {
case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
+ case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false"))
case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
}
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 03fb351..abcd1f0 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite {
seg.delete()
}
}
-
+
+ /* create a segment with pre allocate */
+ def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = {
+ val tempDir = TestUtils.tempDir()
+ val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
+ segments += seg
+ seg
+ }
+
+ /* create a segment with pre allocate, put message to it and verify */
+ @Test
+ def testCreateWithInitFileSizeAppendMessage() {
+ val seg = createSegment(40, false, 512*1024*1024, true)
+ val ms = messages(50, "hello", "there")
+ seg.append(50, ms)
+ val ms2 = messages(60, "alpha", "beta")
+ seg.append(60, ms2)
+ val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+ assertEquals(ms2.toList, read.messageSet.toList)
+ }
+
+ /* create a segment with pre allocate and clearly shut down*/
+ @Test
+ def testCreateWithInitFileSizeClearShutdown() {
+ val tempDir = TestUtils.tempDir()
+ val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
+
+ val ms = messages(50, "hello", "there")
+ seg.append(50, ms)
+ val ms2 = messages(60, "alpha", "beta")
+ seg.append(60, ms2)
+ val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+ assertEquals(ms2.toList, read.messageSet.toList)
+ val oldSize = seg.log.sizeInBytes()
+ val oldPosition = seg.log.channel.position
+ val oldFileSize = seg.log.file.length
+ assertEquals(512*1024*1024, oldFileSize)
+ seg.close()
+ //After close, file should be trimed
+ assertEquals(oldSize, seg.log.file.length)
+
+ val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true, 512*1024*1024, true)
+ segments += segReopen
+
+ val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
+ assertEquals(ms2.toList, readAgain.messageSet.toList)
+ val size = segReopen.log.sizeInBytes()
+ val position = segReopen.log.channel.position
+ val fileSize = segReopen.log.file.length
+ assertEquals(oldPosition, position)
+ assertEquals(oldSize, size)
+ assertEquals(size, fileSize)
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 8268852..98a5b04 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -192,6 +192,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
+ case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false"))
case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString)
case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString)