You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/06/19 16:52:44 UTC

kafka git commit: kafka-1646; Improve consumer read performance for Windows; patched by Honghai Chen; reviewed by Jay Kreps and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 19c98cb8e -> ca758252c


kafka-1646; Improve consumer read performance for Windows; patched by Honghai Chen; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca758252
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca758252
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca758252

Branch: refs/heads/trunk
Commit: ca758252c5a524fe6135a585282dd4bf747afef2
Parents: 19c98cb
Author: Honghai Chen <wa...@163.com>
Authored: Fri Jun 19 07:52:37 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jun 19 07:52:37 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/log/FileMessageSet.scala   | 58 +++++++++++++++++++-
 core/src/main/scala/kafka/log/Log.scala         | 30 ++++++++--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  2 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |  6 ++
 core/src/main/scala/kafka/log/LogSegment.scala  |  4 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 ++
 .../main/scala/kafka/server/KafkaServer.scala   |  1 +
 core/src/main/scala/kafka/utils/CoreUtils.scala | 10 ----
 .../unit/kafka/log/FileMessageSetTest.scala     | 55 ++++++++++++++++++-
 .../scala/unit/kafka/log/LogConfigTest.scala    |  1 +
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 54 +++++++++++++++++-
 .../kafka/server/KafkaConfigConfigDefTest.scala |  1 +
 12 files changed, 204 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 2522604..39361fe 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -54,7 +54,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /* if this is not a slice, update the file pointer to the end of the file */
   if (!isSlice)
     /* set the file position to the last byte in the file */
-    channel.position(channel.size)
+    channel.position(math.min(channel.size().toInt, end))
 
   /**
    * Create a file message set with no slicing.
@@ -66,12 +66,25 @@ class FileMessageSet private[kafka](@volatile var file: File,
    * Create a file message set with no slicing
    */
   def this(file: File) = 
-    this(file, CoreUtils.openChannel(file, mutable = true))
+    this(file, FileMessageSet.openChannel(file, mutable = true))
+
+  /**
+   * Create a file message set with no slicing, and with initFileSize and preallocate.
+   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
+   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
+   */
+  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
+      this(file,
+        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
+        start = 0,
+        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
+        isSlice = false)
 
   /**
    * Create a file message set with mutable option
    */
-  def this(file: File, mutable: Boolean) = this(file, CoreUtils.openChannel(file, mutable))
+  def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable))
   
   /**
    * Create a slice view of the file message set that begins and ends at the given byte offsets
@@ -223,10 +236,18 @@ class FileMessageSet private[kafka](@volatile var file: File,
    */
   def close() {
     flush()
+    trim()
     channel.close()
   }
   
   /**
+   * Trim file when close or roll to next file
+   */
+  def trim() {    
+    truncateTo(sizeInBytes())
+  }
+  
+  /**
    * Delete this message set from the filesystem
    * @return True iff this message set was deleted.
    */
@@ -272,6 +293,37 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
   
 }
+  
+object FileMessageSet
+{
+  /**
+   * Open a channel for the given file
+   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
+   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
+   * @param file File path
+   * @param mutable mutable
+   * @param fileAlreadyExists File already exists or not
+   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
+   * @param preallocate Pre allocate file or not, gotten from configuration.
+   */
+  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
+    if (mutable) {
+      if (fileAlreadyExists)
+        new RandomAccessFile(file, "rw").getChannel()
+      else {
+        if (preallocate) {
+          val randomAccessFile = new RandomAccessFile(file, "rw")
+          randomAccessFile.setLength(initFileSize)
+          randomAccessFile.getChannel()
+        }
+        else
+          new RandomAccessFile(file, "rw").getChannel()
+      }
+    }
+    else
+      new FileInputStream(file).getChannel()
+  }
+}
 
 object LogFlushStats extends KafkaMetricsGroup {
   val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 84e7b8f..6b9274d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -78,6 +78,13 @@ class Log(val dir: File,
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(time.milliseconds)
 
+  def initFileSize() : Int = {
+    if (config.preallocate)
+      config.segmentSize
+    else
+      0
+  }
+
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
   loadSegments()
@@ -168,7 +175,8 @@ class Log(val dir: File,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
                                      rollJitterMs = config.randomSegmentJitter,
-                                     time = time)
+                                     time = time,
+                                     fileAlreadyExists = true)
         if(!hasIndex) {
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
@@ -205,7 +213,10 @@ class Log(val dir: File,
                                      indexIntervalBytes = config.indexInterval, 
                                      maxIndexSize = config.maxIndexSize,
                                      rollJitterMs = config.randomSegmentJitter,
-                                     time = time))
+                                     time = time,
+                                     fileAlreadyExists = false,
+                                     initFileSize = this.initFileSize(),
+                                     preallocate = config.preallocate))
     } else {
       recoverLog()
       // reset the index size of the currently active log segment to allow more entries
@@ -586,14 +597,20 @@ class Log(val dir: File,
     
       segments.lastEntry() match {
         case null => 
-        case entry => entry.getValue.index.trimToValidSize()
+        case entry => {
+          entry.getValue.index.trimToValidSize()
+          entry.getValue.log.trim()
+        }
       }
       val segment = new LogSegment(dir, 
                                    startOffset = newOffset,
                                    indexIntervalBytes = config.indexInterval, 
                                    maxIndexSize = config.maxIndexSize,
                                    rollJitterMs = config.randomSegmentJitter,
-                                   time = time)
+                                   time = time,
+                                   fileAlreadyExists = false,
+                                   initFileSize = initFileSize,
+                                   preallocate = config.preallocate)
       val prev = addSegment(segment)
       if(prev != null)
         throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
@@ -687,7 +704,10 @@ class Log(val dir: File,
                                 indexIntervalBytes = config.indexInterval, 
                                 maxIndexSize = config.maxIndexSize,
                                 rollJitterMs = config.randomSegmentJitter,
-                                time = time))
+                                time = time,
+                                fileAlreadyExists = false,
+                                initFileSize = initFileSize,
+                                preallocate = config.preallocate))
       updateLogEndOffset(newOffset)
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index c9ade72..d07a391 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -344,7 +344,7 @@ private[log] class Cleaner(val id: Int,
     logFile.delete()
     val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
     indexFile.delete()
-    val messages = new FileMessageSet(logFile)
+    val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
     val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index e9af221..fc41132 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -43,6 +43,7 @@ object Defaults {
   val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
   val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
   val CompressionType = kafka.server.Defaults.CompressionType
+  val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props) {
@@ -64,6 +65,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
   val uncleanLeaderElectionEnable = getBoolean(LogConfig.UncleanLeaderElectionEnableProp)
   val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
   val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
+  val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -95,6 +97,7 @@ object LogConfig {
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CompressionTypeProp = "compression.type"
+  val PreAllocateEnableProp = "preallocate"
 
   val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
   val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
@@ -118,6 +121,7 @@ object LogConfig {
   val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " +
     "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
     "no compression; and 'producer' which means retain the original compression codec set by the producer."
+  val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
 
   private val configDef = {
     import ConfigDef.Range._
@@ -149,6 +153,8 @@ object LogConfig {
         MEDIUM, UncleanLeaderElectionEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), MEDIUM, MinInSyncReplicasDoc)
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
+      .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
+        MEDIUM, PreAllocateEnableDoc)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index ed03953..1377e8f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -52,8 +52,8 @@ class LogSegment(val log: FileMessageSet,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
   
-  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
-    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), 
+  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
+    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
          new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index e0b2480..c1f0cca 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -83,6 +83,7 @@ object Defaults {
   val LogDeleteDelayMs = 60000
   val LogFlushSchedulerIntervalMs = Long.MaxValue
   val LogFlushOffsetCheckpointIntervalMs = 60000
+  val LogPreAllocateEnable = false
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
@@ -206,6 +207,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
   val LogFlushIntervalMsProp = "log.flush.interval.ms"
   val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
+  val LogPreAllocateProp = "log.preallocate"
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
@@ -332,6 +334,7 @@ object KafkaConfig {
   val LogFlushSchedulerIntervalMsDoc = "The frequency in ms that the log flusher checks whether any log needs to be flushed to disk"
   val LogFlushIntervalMsDoc = "The maximum time in ms that a message in any topic is kept in memory before flushed to disk"
   val LogFlushOffsetCheckpointIntervalMsDoc = "The frequency with which we update the persistent record of the last flush which acts as the log recovery point"
+  val LogPreAllocateEnableDoc = "Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true."
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)"
@@ -466,6 +469,7 @@ object KafkaConfig {
       .define(LogFlushSchedulerIntervalMsProp, LONG, Defaults.LogFlushSchedulerIntervalMs, HIGH, LogFlushSchedulerIntervalMsDoc)
       .define(LogFlushIntervalMsProp, LONG, HIGH, LogFlushIntervalMsDoc, false)
       .define(LogFlushOffsetCheckpointIntervalMsProp, INT, Defaults.LogFlushOffsetCheckpointIntervalMs, atLeast(0), HIGH, LogFlushOffsetCheckpointIntervalMsDoc)
+      .define(LogPreAllocateProp, BOOLEAN, Defaults.LogPreAllocateEnable, MEDIUM, LogPreAllocateEnableDoc)
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
@@ -609,6 +613,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
   val logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
   val logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
   val minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
+  val logPreAllocateEnable: Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9de2a6f..52dc728 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -443,6 +443,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
         case KafkaConfig.MinInSyncReplicasProp => logProps.put(LogConfig.MinInSyncReplicasProp, entry.getValue)
         case KafkaConfig.CompressionTypeProp => logProps.put(LogConfig.CompressionTypeProp, entry.getValue)
         case KafkaConfig.UncleanLeaderElectionEnableProp => logProps.put(LogConfig.UncleanLeaderElectionEnableProp, entry.getValue)
+        case KafkaConfig.LogPreAllocateProp => logProps.put(LogConfig.PreAllocateEnableProp, entry.getValue)
         case _ => // we just leave those out
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index f5d704c..168a18d 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -70,16 +70,6 @@ object CoreUtils extends Logging {
     Utils.daemonThread(name, runnable(fun))
 
   /**
-   * Open a channel for the given file
-   */
-  def openChannel(file: File, mutable: Boolean): FileChannel = {
-    if(mutable)
-      new RandomAccessFile(file, "rw").getChannel()
-    else
-      new FileInputStream(file).getChannel()
-  }
-
-  /**
    * Do the given action and log any exceptions thrown without rethrowing them
    * @param log The log method to use for logging. E.g. logger.warn
    * @param action The action to execute

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index cec1cae..02cf668 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.log
 
+import java.io._
 import java.nio._
 import java.util.concurrent.atomic._
 import junit.framework.Assert._
@@ -146,5 +147,57 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
     assertEquals(List(message), messageSet.toList)
     assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
   }
-  
+
+  /**
+   * Test the new FileMessageSet with pre allocate as true
+   */
+  @Test
+  def testPreallocateTrue() {
+    val temp = tempFile()
+    val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
+    val position = set.channel.position
+    val size = set.sizeInBytes()
+    assertEquals(0, position)
+    assertEquals(0, size)
+    assertEquals(512 *1024 *1024, temp.length)
+  }
+
+  /**
+   * Test the new FileMessageSet with pre allocate as false
+   */
+  @Test
+  def testPreallocateFalse() {
+    val temp = tempFile()
+    val set = new FileMessageSet(temp, false, 512 *1024 *1024, false)
+    val position = set.channel.position
+    val size = set.sizeInBytes()
+    assertEquals(0, position)
+    assertEquals(0, size)
+    assertEquals(0, temp.length)
+  }
+
+  /**
+   * Test the new FileMessageSet with pre allocate as true and file has been clearly shut down, the file will be truncate to end of valid data.
+   */
+  @Test
+  def testPreallocateClearShutdown() {
+    val temp = tempFile()
+    val set = new FileMessageSet(temp, false, 512 *1024 *1024, true)
+    set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+    val oldposition = set.channel.position
+    val oldsize = set.sizeInBytes()
+    assertEquals(messageSet.sizeInBytes, oldposition)
+    assertEquals(messageSet.sizeInBytes, oldsize)
+    set.close()
+
+    val tempReopen = new File(temp.getAbsolutePath())
+    val setReopen = new FileMessageSet(tempReopen, true, 512 *1024 *1024, true)
+    val position = setReopen.channel.position
+    val size = setReopen.sizeInBytes()
+
+    assertEquals(oldposition, position)
+    assertEquals(oldposition, size)
+    assertEquals(oldposition, tempReopen.length)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index c31f884..19dcb47 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -45,6 +45,7 @@ class LogConfigTest extends JUnit3Suite {
         case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString)
         case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString)
         case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString)
+        case LogConfig.PreAllocateEnableProp => expected.setProperty(name, randFrom("true", "false"))
         case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
       }
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 03fb351..abcd1f0 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -226,5 +226,57 @@ class LogSegmentTest extends JUnit3Suite {
       seg.delete()
     }
   }
-  
+
+  /* create a segment with   pre allocate */
+  def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = {
+    val tempDir = TestUtils.tempDir()
+    val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
+    segments += seg
+    seg
+  }
+
+  /* create a segment with   pre allocate, put message to it and verify */
+  @Test
+  def testCreateWithInitFileSizeAppendMessage() {
+    val seg = createSegment(40, false, 512*1024*1024, true)
+    val ms = messages(50, "hello", "there")
+    seg.append(50, ms)
+    val ms2 = messages(60, "alpha", "beta")
+    seg.append(60, ms2)
+    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    assertEquals(ms2.toList, read.messageSet.toList)
+  }
+
+  /* create a segment with   pre allocate and clearly shut down*/
+  @Test
+  def testCreateWithInitFileSizeClearShutdown() {
+    val tempDir = TestUtils.tempDir()
+    val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
+
+    val ms = messages(50, "hello", "there")
+    seg.append(50, ms)
+    val ms2 = messages(60, "alpha", "beta")
+    seg.append(60, ms2)
+    val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    assertEquals(ms2.toList, read.messageSet.toList)
+    val oldSize = seg.log.sizeInBytes()
+    val oldPosition = seg.log.channel.position
+    val oldFileSize = seg.log.file.length
+    assertEquals(512*1024*1024, oldFileSize)
+    seg.close()
+    //After close, file should be trimed
+    assertEquals(oldSize, seg.log.file.length)
+
+    val segReopen = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, true,  512*1024*1024, true)
+    segments += segReopen
+
+    val readAgain = segReopen.read(startOffset = 55, maxSize = 200, maxOffset = None)
+    assertEquals(ms2.toList, readAgain.messageSet.toList)
+    val size = segReopen.log.sizeInBytes()
+    val position = segReopen.log.channel.position
+    val fileSize = segReopen.log.file.length
+    assertEquals(oldPosition, position)
+    assertEquals(oldSize, size)
+    assertEquals(size, fileSize)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca758252/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 8268852..98a5b04 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -192,6 +192,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
         case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
         case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
         case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.LogPreAllocateProp => expected.setProperty(name, randFrom("true", "false"))
         case KafkaConfig.InterBrokerSecurityProtocolProp => expected.setProperty(name, SecurityProtocol.PLAINTEXT.toString)
         case KafkaConfig.InterBrokerProtocolVersionProp => expected.setProperty(name, ApiVersion.latestVersion.toString)