You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/12 03:26:37 UTC

[11/11] git commit: merge from 0.8 and resolve conflicts

Updated Branches:
  refs/heads/trunk ed36a7f07 -> 9249b76d1


merge from 0.8 and resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9249b76d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9249b76d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9249b76d

Branch: refs/heads/trunk
Commit: 9249b76d1d04f2583843cf5fe09ba8bbdf611183
Parents: ed36a7f a409531
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 11 18:25:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 11 18:25:56 2013 -0800

----------------------------------------------------------------------
 config/consumer.properties                         |    2 +-
 config/log4j.properties                            |   45 +++++++--
 config/producer.properties                         |   25 +----
 config/server.properties                           |   34 +++----
 .../main/java/kafka/etl/impl/DataGenerator.java    |    2 +-
 .../kafka/bridge/hadoop/KafkaOutputFormat.java     |    3 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    5 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |    1 -
 .../main/scala/kafka/api/StopReplicaRequest.scala  |    4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala     |   20 ++--
 core/src/main/scala/kafka/client/ClientUtils.scala |   15 ++-
 core/src/main/scala/kafka/cluster/Partition.scala  |   10 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |   18 ++--
 .../main/scala/kafka/consumer/ConsumerConfig.scala |   32 +++---
 .../kafka/consumer/ConsumerFetcherThread.scala     |   10 +-
 .../consumer/ZookeeperConsumerConnector.scala      |   20 ++--
 .../controller/ControllerChannelManager.scala      |    6 +-
 .../scala/kafka/controller/KafkaController.scala   |    4 +-
 .../kafka/controller/PartitionStateMachine.scala   |    5 +-
 .../kafka/controller/ReplicaStateMachine.scala     |    2 +-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |   11 ++-
 core/src/main/scala/kafka/log/FileMessageSet.scala |    5 +-
 core/src/main/scala/kafka/log/Log.scala            |   18 +++-
 core/src/main/scala/kafka/log/LogManager.scala     |   36 ++++----
 core/src/main/scala/kafka/log/OffsetIndex.scala    |    8 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   11 ++-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    8 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |   61 +++++++++---
 .../scala/kafka/producer/DefaultPartitioner.scala  |    5 +-
 .../scala/kafka/producer/KafkaLog4jAppender.scala  |    4 +-
 core/src/main/scala/kafka/producer/Producer.scala  |   12 +-
 .../main/scala/kafka/producer/ProducerConfig.scala |   29 ++++--
 .../main/scala/kafka/producer/SyncProducer.scala   |    2 +-
 .../scala/kafka/producer/SyncProducerConfig.scala  |   10 +-
 .../kafka/producer/async/AsyncProducerConfig.scala |    8 +-
 .../kafka/producer/async/DefaultEventHandler.scala |   76 ++++++++++-----
 .../scala/kafka/server/AbstractFetcherThread.scala |    6 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   54 ++++++----
 core/src/main/scala/kafka/server/KafkaConfig.scala |   74 +++++++-------
 core/src/main/scala/kafka/server/KafkaServer.scala |    4 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    8 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    6 +-
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |   65 ++++++-------
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    4 +-
 .../main/scala/kafka/tools/ReplayLogProducer.scala |   14 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    6 +-
 .../scala/other/kafka/TestEndToEndLatency.scala    |    4 +-
 .../scala/other/kafka/TestLogPerformance.scala     |    2 +-
 .../scala/other/kafka/TestZKConsumerOffsets.scala  |    2 +-
 .../api/RequestResponseSerializationTest.scala     |    6 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |    4 +-
 .../integration/ProducerConsumerTestHarness.scala  |    8 +-
 .../unit/kafka/integration/TopicMetadataTest.scala |   33 +++++--
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   18 ++--
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   56 ++++++------
 .../unit/kafka/producer/AsyncProducerTest.scala    |   20 ++--
 .../scala/unit/kafka/producer/ProducerTest.scala   |   16 ++--
 .../unit/kafka/producer/SyncProducerTest.scala     |   17 ++--
 .../unit/kafka/server/ISRExpirationTest.scala      |   10 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |    6 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    8 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   22 ++--
 .../src/main/java/kafka/examples/Consumer.java     |    8 +-
 .../scala/kafka/perf/ConsumerPerformance.scala     |   12 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |   15 ++--
 .../config/mirror_producer.properties              |    4 +-
 .../config/mirror_producer1.properties             |    4 +-
 .../config/mirror_producer2.properties             |    4 +-
 .../config/mirror_producer3.properties             |    4 +-
 .../config/server_source1.properties               |   20 ++--
 .../config/server_source2.properties               |   20 ++--
 .../config/server_source3.properties               |   20 ++--
 .../config/server_source4.properties               |   20 ++--
 .../config/server_target1.properties               |   20 ++--
 .../config/server_target2.properties               |   20 ++--
 .../config/server_target3.properties               |   20 ++--
 .../config/whitelisttest.consumer.properties       |    6 +-
 system_test/common/util.sh                         |    8 +-
 .../config/migration_producer.properties           |   29 +-----
 .../config/server.properties                       |   36 ++++----
 .../config/blacklisttest.consumer.properties       |    6 +-
 .../mirror_maker/config/mirror_producer.properties |    4 +-
 .../config/server_source_1_1.properties            |   20 ++--
 .../config/server_source_1_2.properties            |   20 ++--
 .../config/server_source_2_1.properties            |   20 ++--
 .../config/server_source_2_2.properties            |   20 ++--
 .../config/server_target_1_1.properties            |   20 ++--
 .../config/server_target_1_2.properties            |   20 ++--
 .../config/whitelisttest_1.consumer.properties     |    6 +-
 .../config/whitelisttest_2.consumer.properties     |    6 +-
 .../config/mirror_consumer.properties              |   20 ++--
 .../config/mirror_producer.properties              |    2 +-
 .../config/server.properties                       |   56 ++++++------
 system_test/producer_perf/config/server.properties |   20 ++--
 .../replication_testsuite/config/server.properties |   56 ++++++------
 98 files changed, 855 insertions(+), 759 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b813b12,42a9628..cca6a11
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@@ -112,14 -112,11 +112,14 @@@ private[kafka] class ZookeeperConsumerC
  
    connectZk()
    createFetcher()
-   if (config.autoCommit) {
+   if (config.autoCommitEnable) {
      scheduler.startup
      info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
 -    scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
 -      config.autoCommitIntervalMs, false)
 +    scheduler.schedule("kafka-consumer-autocommit", 
 +                       autoCommit, 
 +                       delay = config.autoCommitIntervalMs,
 +                       period = config.autoCommitIntervalMs, 
 +                       unit = TimeUnit.MILLISECONDS)
    }
  
    KafkaMetricsReporter.startReporters(config.props)
@@@ -163,8 -160,8 +163,8 @@@
        if (wildcardTopicWatcher != null)
          wildcardTopicWatcher.shutdown()
        try {
-         if (config.autoCommit)
+         if (config.autoCommitEnable)
 -          scheduler.shutdownNow()
 +          scheduler.shutdown()
          fetcher match {
            case Some(f) => f.shutdown
            case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/FileMessageSet.scala
index 5284026,5845bb6..37e8d87
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@@ -40,38 -36,28 +40,41 @@@ import kafka.metrics.{KafkaTimer, Kafka
  @nonthreadsafe
  class FileMessageSet private[kafka](val file: File,
                                      private[log] val channel: FileChannel,
 -                                    private[log] val start: Int = 0,
 -                                    private[log] val limit: Int = Int.MaxValue,
 -                                    initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging {
 +                                    private[log] val start: Int,
 +                                    private[log] val end: Int,
 +                                    isSlice: Boolean) extends MessageSet with Logging {
    
    /* the size of the message set in bytes */
 -  private val _size = new AtomicInteger(scala.math.min(channel.size().toInt, limit) - start)
 +  private val _size = 
 +    if(isSlice)
 +      new AtomicInteger(end - start) // don't check the file size if this is just a slice view
 +    else
 +      new AtomicInteger(math.min(channel.size().toInt, end) - start)
  
 -  if (initChannelPositionToEnd) {
 +  /* if this is not a slice, update the file pointer to the end of the file */
-   if (!isSlice)
++  if (!isSlice) {
+     info("Creating or reloading log segment %s".format(file.getAbsolutePath))
+     /* set the file position to the last byte in the file */
      channel.position(channel.size)
+   }
  
    /**
 -   * Create a file message set with no limit or offset
 +   * Create a file message set with no slicing.
     */
 -  def this(file: File, channel: FileChannel) = this(file, channel, 0, Int.MaxValue)
 +  def this(file: File, channel: FileChannel) = 
 +    this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
    
    /**
 -   * Create a file message set with no limit or offset
 +   * Create a file message set with no slicing
     */
 -  def this(file: File) = this(file, Utils.openChannel(file, mutable = true))
 +  def this(file: File) = 
 +    this(file, Utils.openChannel(file, mutable = true))
 +  
 +  /**
 +   * Create a slice view of the file message set that begins and ends at the given byte offsets
 +   */
 +  def this(file: File, channel: FileChannel, start: Int, end: Int) = 
 +    this(file, channel, start, end, isSlice = true)
    
    /**
     * Return a message set which is a view into this set starting from the given position and with the given size limit.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index 1654dbf,79db610..f6a32b6
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -366,52 -428,59 +366,58 @@@ class Log(val dir: File
    def logEndOffset: Long = nextOffset.get
  
    /**
 -   * Roll the log over if necessary
 +   * Roll the log over to a new empty log segment if necessary
 +   * @return The currently active segment after (perhaps) rolling to a new segment
     */
 -  private def maybeRoll(segment: LogSegment): LogSegment = {
 -    if(segment.messageSet.sizeInBytes > maxLogFileSize) {
 +  private def maybeRoll(): LogSegment = {
 +    val segment = activeSegment
-     if ((segment.size > maxSegmentSize) ||
-        (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) ||
-        segment.index.isFull)
++    if (segment.size > maxSegmentSize) {
+       info("Rolling %s due to full data log".format(name))
        roll()
-     else
 -    } else if((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) {
++    } else if (segment.size > 0 && time.milliseconds - segment.created > rollIntervalMs) {
+       info("Rolling %s due to time based rolling".format(name))
+       roll()
 -    } else if(segment.index.isFull) {
++    } else if (segment.index.isFull) {
+       info("Rolling %s due to full index maxIndexSize = %d, entries = %d, maxEntries = %d"
+         .format(name, segment.index.maxIndexSize, segment.index.entries(), segment.index.maxEntries))
+       roll()
+     } else
        segment
    }
 -
 +  
    /**
 -   * Create a new segment and make it active, and return it
 +   * Roll the log over to a new active segment starting with the current logEndOffset.
 +   * This will trim the index to the exact size of the number of entries it currently contains.
 +   * @return The newly rolled segment
     */
    def roll(): LogSegment = {
      lock synchronized {
 -      flush()
 -      rollToOffset(logEndOffset)
 -    }
 -  }
 +      // flush the log to ensure that only the active segment needs to be recovered
 +      if(!segments.isEmpty())
 +        flush()
    
 -  /**
 -   * Roll the log over to the given new offset value
 -   */
 -  private def rollToOffset(newOffset: Long): LogSegment = {
 -    val logFile = logFilename(dir, newOffset)
 -    val indexFile = indexFilename(dir, newOffset)
 -    for(file <- List(logFile, indexFile); if file.exists) {
 -      warn("Newly rolled segment file " + file.getAbsolutePath + " already exists; deleting it first")
 -      file.delete()
 -    }
 -    info("Rolling log '" + name + "' to " + logFile.getAbsolutePath + " and " + indexFile.getAbsolutePath)
 -    segments.view.lastOption match {
 -      case Some(segment) => segment.index.trimToValidSize()
 -      case None => 
 +      val newOffset = logEndOffset
 +      val logFile = logFilename(dir, newOffset)
 +      val indexFile = indexFilename(dir, newOffset)
 +      for(file <- List(logFile, indexFile); if file.exists) {
 +        warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
 +        file.delete()
 +      }
 +    
-       debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
++      info("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
 +      segments.lastEntry() match {
 +        case null => 
 +        case entry => entry.getValue.index.trimToValidSize()
 +      }
 +      val segment = new LogSegment(dir, 
 +                                   startOffset = newOffset,
 +                                   indexIntervalBytes = indexIntervalBytes, 
 +                                   maxIndexSize = maxIndexSize)
 +      val prev = segments.put(segment.baseOffset, segment)
 +      if(prev != null)
 +        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
 +      segment
      }
 -
 -    val segmentsView = segments.view
 -    if(segmentsView.size > 0 && segmentsView.last.start == newOffset)
 -      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists".format(dir.getName, newOffset))
 -
 -    val segment = new LogSegment(dir, 
 -                                 startOffset = newOffset,
 -                                 indexIntervalBytes = indexIntervalBytes, 
 -                                 maxIndexSize = maxIndexSize)
 -    segments.append(segment)
 -    segment
    }
  
    /**
@@@ -439,34 -507,104 +445,34 @@@
       }
    }
  
 -  def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
 -    val segsArray = segments.view
 -    var offsetTimeArray: Array[(Long, Long)] = null
 -    if(segsArray.last.size > 0)
 -      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
 -    else
 -      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 -
 -    for(i <- 0 until segsArray.length)
 -      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
 -    if(segsArray.last.size > 0)
 -      offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
 -
 -    var startIndex = -1
 -    timestamp match {
 -      case OffsetRequest.LatestTime =>
 -        startIndex = offsetTimeArray.length - 1
 -      case OffsetRequest.EarliestTime =>
 -        startIndex = 0
 -      case _ =>
 -        var isFound = false
 -        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
 -        startIndex = offsetTimeArray.length - 1
 -        while (startIndex >= 0 && !isFound) {
 -          if (offsetTimeArray(startIndex)._2 <= timestamp)
 -            isFound = true
 -          else
 -            startIndex -=1
 -        }
 -    }
 -
 -    val retSize = maxNumOffsets.min(startIndex + 1)
 -    val ret = new Array[Long](retSize)
 -    for(j <- 0 until retSize) {
 -      ret(j) = offsetTimeArray(startIndex)._1
 -      startIndex -= 1
 -    }
 -    // ensure that the returned seq is in descending order of offsets
 -    ret.toSeq.sortBy(- _)
 -  }
 -
 +  /**
 +   * Completely delete this log directory and all contents from the file system with no delay
 +   */
    def delete(): Unit = {
 -    deleteSegments(segments.contents.get())
 +    logSegments.foreach(_.delete())
      Utils.rm(dir)
    }
-   
+ 
 -
 -  /* Attempts to delete all provided segments from a log and returns how many it was able to */
 -  def deleteSegments(segments: Seq[LogSegment]): Int = {
 -    var total = 0
 -    for(segment <- segments) {
 -      info("Deleting log segment " + segment.start + " from " + name)
 -      val deletedLog = segment.messageSet.delete()
 -      val deletedIndex = segment.index.delete()
 -      if(!deletedIndex || !deletedLog) {
 -        throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.")
 -      } else {
 -        total += 1
 -      }
 -      if(segment.messageSet.file.exists())
 -        error("Data log file %s still exists".format(segment.messageSet.file.getAbsolutePath))
 -      if(segment.index.file.exists())
 -        error("Index file %s still exists".format(segment.index.file.getAbsolutePath))
 -    }
 -    total
 -  }
 -  
 +  /**
 +   * Truncate this log so that it ends with the greatest offset < targetOffset.
 +   * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
 +   */
    def truncateTo(targetOffset: Long) {
 +    info("Truncating log %s to offset %d.".format(name, targetOffset))
      if(targetOffset < 0)
        throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
 +    if(targetOffset > logEndOffset) {
 +      info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
 +      return
 +    }
      lock synchronized {
 -      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
 -      val viewSize = segments.view.size
 -      val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
 -      /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
 -      if(numSegmentsDeleted != segmentsToBeDeleted.size)
 -        error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
 -      if (numSegmentsDeleted == viewSize) {
 -        segments.trunc(segments.view.size)
 -        rollToOffset(targetOffset)
 -        this.nextOffset.set(targetOffset)
 +      if(segments.firstEntry.getValue.baseOffset > targetOffset) {
 +        truncateFullyAndStartAt(targetOffset)
        } else {
 -        if(targetOffset > logEndOffset) {
 -          error("Target offset %d cannot be greater than the last message offset %d in the log %s".
 -                format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
 -        } else {
 -          // find the log segment that has this hw
 -          val segmentToBeTruncated = findRange(segments.view, targetOffset)
 -          segmentToBeTruncated match {
 -            case Some(segment) =>
 -              val truncatedSegmentIndex = segments.view.indexOf(segment)
 -              segments.truncLast(truncatedSegmentIndex)
 -              segment.truncateTo(targetOffset)
 -              this.nextOffset.set(targetOffset)
 -              info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
 -            case None => // nothing to do
 -          }
 -        }
 +        val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
 +        deletable.foreach(deleteSegment(_))
 +        activeSegment.truncateTo(targetOffset)
 +        this.nextOffset.set(targetOffset)
        }
      }
    }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogManager.scala
index 2ea9afe,497cfdd..c5ab8a2
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@@ -42,17 -42,16 +42,17 @@@ class LogManager(val config: KafkaConfi
  
    val CleanShutdownFile = ".kafka_cleanshutdown"
    val LockFile = ".lock"
 +  val InitialTaskDelayMs = 30*1000
    val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
-   private val logFileSizeMap = config.logFileSizeMap
-   private val logFlushInterval = config.flushInterval
-   private val logFlushIntervals = config.flushIntervalMap
+   private val logFileSizeMap = config.logSegmentBytesPerTopicMap
+   private val logFlushInterval = config.logFlushIntervalMessages
+   private val logFlushIntervals = config.logFlushIntervalMsPerTopicMap
    private val logCreationLock = new Object
-   private val logRetentionSizeMap = config.logRetentionSizeMap
-   private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
-   private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
+   private val logRetentionSizeMap = config.logRetentionBytesPerTopicMap
+   private val logRetentionMsMap = config.logRetentionHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
+   private val logRollMsMap = config.logRollHoursPerTopicMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
    private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
-   private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+   private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMins
    private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
  
    this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
@@@ -115,18 -111,17 +115,18 @@@
              info("Loading log '" + dir.getName + "'")
              val topicPartition = parseTopicPartitionName(dir.getName)
              val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-             val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+             val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logSegmentBytes)
 -            val log = new Log(dir, 
 -                              maxLogFileSize, 
 +            val log = new Log(dir,
 +                              scheduler,
-                               maxLogFileSize, 
-                               config.maxMessageSize, 
++                              maxLogFileSize,
+                               config.messageMaxBytes,
                                logFlushInterval, 
                                rollIntervalMs, 
                                needsRecovery, 
-                               config.logIndexMaxSizeBytes,
+                               config.logIndexSizeMaxBytes,
                                config.logIndexIntervalBytes,
 -                              time, 
 -                              config.brokerId)
 +                              config.logDeleteDelayMs,
 +                              time)
              val previous = this.logs.put(topicPartition, log)
              if(previous != null)
                throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
@@@ -142,19 -137,12 +142,19 @@@
    def startup() {
      /* Schedule the cleanup task to delete old logs */
      if(scheduler != null) {
 -      info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
 -      scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
 -      info("Starting log flusher every " + config.logFlushSchedulerIntervalMs +
 -                   " ms with the following overrides " + logFlushIntervals)
 -      scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
 -                                 config.logFlushSchedulerIntervalMs, config.logFlushSchedulerIntervalMs, false)
 +      info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
 +      scheduler.schedule("kafka-log-cleaner", 
 +                         cleanupLogs, 
 +                         delay = InitialTaskDelayMs, 
 +                         period = logCleanupIntervalMs, 
 +                         TimeUnit.MILLISECONDS)
 +      info("Starting log flusher with a default period of %d ms with the following overrides: %s."
-           .format(config.defaultFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
++          .format(config.logFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
 +      scheduler.schedule("kafka-log-flusher", 
 +                         flushDirtyLogs, 
 +                         delay = InitialTaskDelayMs, 
-                          period = config.flushSchedulerThreadRate, 
++                         period = config.logFlushSchedulerIntervalMs,
 +                         TimeUnit.MILLISECONDS)
      }
    }
    
@@@ -198,18 -186,17 +198,18 @@@
        val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
        dir.mkdirs()
        val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-       val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+       val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logSegmentBytes)
        log = new Log(dir, 
 +                    scheduler,
                      maxLogFileSize, 
-                     config.maxMessageSize, 
+                     config.messageMaxBytes,
                      logFlushInterval, 
                      rollIntervalMs, 
                      needsRecovery = false, 
-                     config.logIndexMaxSizeBytes, 
+                     config.logIndexSizeMaxBytes,
                      config.logIndexIntervalBytes, 
 -                    time, 
 -                    config.brokerId)
 +                    config.logDeleteDelayMs,
 +                    time)
        info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
        logs.put(topicAndPartition, log)
        log
@@@ -252,9 -249,8 +252,9 @@@
     */
    private def cleanupSegmentsToMaintainSize(log: Log): Int = {
      val topic = parseTopicPartitionName(log.dir.getName).topic
-     val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize)
+     val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionBytes)
 -    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0
 +    if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize)
 +      return 0
      var diff = log.size - maxLogRetentionSize
      def shouldDelete(segment: LogSegment) = {
        if(diff - segment.size >= 0) {
@@@ -309,15 -307,14 +309,15 @@@
     */
    private def flushDirtyLogs() = {
      debug("Checking for dirty logs to flush...")
 -    for (log <- allLogs) {
 +    for ((topicAndPartition, log) <- logs) {
        try {
 -        val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
 +        val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
 +        
-         var logFlushInterval = config.defaultFlushIntervalMs
+         var logFlushInterval = config.logFlushIntervalMs
 -        if(logFlushIntervals.contains(log.topicName))
 -          logFlushInterval = logFlushIntervals(log.topicName)
 -        debug(log.topicName + " flush interval  " + logFlushInterval +
 -                      " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
 +        if(logFlushIntervals.contains(topicAndPartition.topic))
 +          logFlushInterval = logFlushIntervals(topicAndPartition.topic)
 +        debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval  " + logFlushInterval +
 +              " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
          if(timeSinceLastFlush >= logFlushInterval)
            log.flush
        } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/OffsetIndex.scala
index 0e18f28,43b3575..1662f10
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@@ -83,18 -83,19 +83,21 @@@ class OffsetIndex(val file: File, val b
          Utils.swallow(raf.close())
        }
      }
-   
+ 
 -  /* the number of entries in the index */
 -  private var size = new AtomicInteger(mmap.position / 8)
 -  
 -  /* the last offset in the index */
 -  var lastOffset = readLastOffset()
 -  
+   info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d"
+     .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset))
+ 
 -  /* the maximum number of entries this index can hold */
 +  /**
 +   * The maximum number of eight-byte entries this index can hold
 +   */
    def maxEntries = mmap.limit / 8
 +  
 +  /* the number of eight-byte entries currently in the index */
 +  private var size = new AtomicInteger(mmap.position / 8)
 +  
 +  /* the last offset in the index */
 +  var lastOffset = readLastOffset()
-   
+ 
    /**
     * The last offset written to the index
     */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index 4283973,60752fb..a166a1c
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -21,8 -21,6 +21,8 @@@ import kafka.admin.{CreateTopicCommand
  import kafka.api._
  import kafka.message._
  import kafka.network._
 +import kafka.log._
- import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs}
++import kafka.utils.ZKGroupTopicDirs
  import org.apache.log4j.Logger
  import scala.collection._
  import kafka.network.RequestChannel.Response
@@@ -39,13 -38,12 +40,13 @@@ import kafka.utils.{ZkUtils, Pool, Syst
  class KafkaApis(val requestChannel: RequestChannel,
                  val replicaManager: ReplicaManager,
                  val zkClient: ZkClient,
 -                brokerId: Int) extends Logging {
 +                val brokerId: Int,
 +                val config: KafkaConfig) extends Logging {
  
    private val producerRequestPurgatory =
-     new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
+     new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
    private val fetchRequestPurgatory =
-     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchRequestPurgatoryPurgeInterval)
+     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
    private val delayedRequestMetrics = new DelayedRequestMetrics
  
    private val requestLogger = Logger.getLogger("kafka.request.logger")

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaConfig.scala
index ccc35d3,f65db33..51ea727
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@@ -36,22 -36,19 +36,22 @@@ class KafkaConfig private (val props: V
    /*********** General Configuration ***********/
    
    /* the broker id for this server */
-   val brokerId: Int = props.getIntInRange("brokerid", (0, Int.MaxValue))
+   val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
  
    /* the maximum size of message that the server can receive */
-   val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
+   val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000, (0, Int.MaxValue))
    
    /* the number of network threads that the server uses for handling network requests */
-   val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
+   val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
  
    /* the number of io threads that the server uses for carrying out network requests */
-   val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+   val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
    
 +  /* the number of threads to use for various background processing tasks */
 +  val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue))
 +  
    /* the number of queued requests allowed before blocking the network threads */
-   val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
+   val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
    
    /*********** Socket Server Configuration ***********/
    
@@@ -114,21 -111,19 +114,21 @@@
    val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
  
    /* the number of messages accumulated on a log partition before messages are flushed to disk */
-   val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
-   
+   val logFlushIntervalMessages = props.getIntInRange("log.flush.interval.messages", 500, (1, Int.MaxValue))
+ 
 +  val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
 +
    /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
-   val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
+   val logFlushIntervalMsPerTopicMap = props.getMap("log.flush.interval.ms.per.topic", _.toInt > 0).mapValues(_.toInt)
  
    /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */
-   val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms",  3000)
+   val logFlushSchedulerIntervalMs = props.getInt("log.flush.scheduler.interval.ms",  3000)
  
    /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */
-   val defaultFlushIntervalMs = props.getInt("log.default.flush.interval.ms", flushSchedulerThreadRate)
+   val logFlushIntervalMs = props.getInt("log.flush.interval.ms", logFlushSchedulerIntervalMs)
  
    /* enable auto creation of topic on the server */
-   val autoCreateTopics = props.getBoolean("auto.create.topics", true)
+   val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
  
    /*********** Replication configuration ***********/
  
@@@ -162,20 -159,15 +164,20 @@@
  
    /* number of fetcher threads used to replicate messages from a source broker.
     * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
-   val numReplicaFetchers = props.getInt("replica.fetchers", 1)
+   val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
    
-   /* the frequency with which the highwater mark is saved out to disk */
-   val highWaterMarkCheckpointIntervalMs = props.getLong("replica.highwatermark.checkpoint.ms", 5000L)
+   /* the frequency with which the high watermark is saved out to disk */
+   val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
  
    /* the purge interval (in number of requests) of the fetch request purgatory */
-   val fetchRequestPurgatoryPurgeInterval = props.getInt("fetch.purgatory.purge.interval", 10000)
+   val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 10000)
  
    /* the purge interval (in number of requests) of the producer request purgatory */
-   val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+   val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
  
 +  /*********** Misc configuration ***********/
 +  
 +  /* the maximum size for a metadata entry associated with an offset commit */
 +  val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", 1024)
 +
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/ReplicaManager.scala
index 6a8213c,064af6b..7810c21
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@@ -72,7 -72,7 +72,7 @@@ class ReplicaManager(val config: KafkaC
  
    def startHighWaterMarksCheckPointThread() = {
      if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-       scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.highWaterMarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
 -      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.replicaHighWatermarkCheckpointIntervalMs)
++      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
    }
  
    /**
@@@ -91,7 -91,7 +91,7 @@@
  
    def startup() {
      // start ISR expiration thread
-     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaMaxLagTimeMs, unit = TimeUnit.MILLISECONDS)
 -    kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaLagTimeMaxMs)
++    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
    }
  
    def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/other/kafka/TestLogPerformance.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/other/kafka/TestLogPerformance.scala
index 67fb168,9f3bb40..a7b661a
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@@ -33,8 -33,7 +33,8 @@@ object TestLogPerformance 
      val props = TestUtils.createBrokerConfig(0, -1)
      val config = new KafkaConfig(props)
      val dir = TestUtils.tempDir()
 -    val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
 +    val scheduler = new KafkaScheduler(1)
-     val log = new Log(dir, scheduler, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
++    val log = new Log(dir, scheduler, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
      val bytes = new Array[Byte](messageSize)
      new java.util.Random().nextBytes(bytes)
      val message = new Message(bytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7fc154f,ce893bf..f48c709
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@@ -39,11 -40,12 +39,11 @@@ class LogManagerTest extends JUnit3Suit
    override def setUp() {
      super.setUp()
      config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
-                    override val logFileSize = 1024
-                    override val flushInterval = 10000
+                    override val logSegmentBytes = 1024
+                    override val logFlushIntervalMessages = 10000
                     override val logRetentionHours = maxLogAgeHours
                   }
 -    scheduler.startup
 -    logManager = new LogManager(config, scheduler, time)
 +    logManager = new LogManager(config, time.scheduler, time)
      logManager.startup
      logDir = logManager.logDirs(0)
    }
@@@ -119,11 -112,15 +119,11 @@@
      val props = TestUtils.createBrokerConfig(0, -1)
      logManager.shutdown()
      config = new KafkaConfig(props) {
-       override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
-       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
+       override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
+       override val logRetentionBytes = (5 * 10 * setSize + 10).asInstanceOf[Long]
 -      override val logRetentionHours = retentionHours
 -      override val logFlushIntervalMessages = 100
        override val logRollHours = maxRollInterval
      }
 -    logManager = new LogManager(config, scheduler, time)
 +    logManager = new LogManager(config, time.scheduler, time)
      logManager.startup
  
      // create a log
@@@ -131,21 -128,20 +131,21 @@@
      var offset = 0L
  
      // add a bunch of messages that should be larger than the retentionSize
 -    for(i <- 0 until 1000) {
 +    val numMessages = 200
 +    for(i <- 0 until numMessages) {
        val set = TestUtils.singleMessageSet("test".getBytes())
 -      val (start, end) = log.append(set)
 -      offset = start
 +      val info = log.append(set)
 +      offset = info.firstOffset
      }
 -    // flush to make sure it's written to disk
 -    log.flush
  
      // should be exactly 100 full segments + 1 new empty one
-     assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logFileSize, log.numberOfSegments)
 -    assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
++    assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logSegmentBytes, log.numberOfSegments)
  
      // this cleanup shouldn't find any expired segments but should delete some to reduce size
 -    logManager.cleanupLogs()
 +    time.sleep(logManager.InitialTaskDelayMs)
      assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
 +    time.sleep(log.segmentDeleteDelayMs + 1)
 +    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
      assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
      try {
        log.read(0, 1024)
@@@ -165,14 -158,15 +165,14 @@@
      val props = TestUtils.createBrokerConfig(0, -1)
      logManager.shutdown()
      config = new KafkaConfig(props) {
-                    override val flushSchedulerThreadRate = 1000
-                    override val defaultFlushIntervalMs = 1000
-                    override val flushInterval = Int.MaxValue
 -                   override val logSegmentBytes = 1024 *1024 *1024
 -                   override val logFlushSchedulerIntervalMs = 50
++                   override val logFlushSchedulerIntervalMs = 1000
++                   override val logFlushIntervalMs = 1000
+                    override val logFlushIntervalMessages = Int.MaxValue
 -                   override val logRollHours = maxRollInterval
 -                   override val logFlushIntervalMsPerTopicMap = Map("timebasedflush" -> 100)
                   }
 -    logManager = new LogManager(config, scheduler, time)
 +    logManager = new LogManager(config, time.scheduler, time)
      logManager.startup
      val log = logManager.getOrCreateLog(name, 0)
 +    val lastFlush = log.lastFlushTime
      for(i <- 0 until 200) {
        var set = TestUtils.singleMessageSet("test".getBytes())
        log.append(set)
@@@ -191,9 -183,9 +191,9 @@@
      val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
                     TestUtils.tempDir().getAbsolutePath, 
                     TestUtils.tempDir().getAbsolutePath)
-     props.put("log.directories", dirs.mkString(","))
+     props.put("log.dirs", dirs.mkString(","))
      logManager.shutdown()
 -    logManager = new LogManager(new KafkaConfig(props), scheduler, time)
 +    logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
      
      // verify that logs are always assigned to the least loaded partition
      for(partition <- 0 until 20) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala
index 109474c,786ae03..0fc74fa
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@@ -65,8 -62,8 +65,8 @@@ class LogTest extends JUnitSuite 
      val time: MockTime = new MockTime()
  
      // create a log
-     val log = new Log(logDir, time.scheduler, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
 -    val log = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
 -    time.currentMs += rollMs + 1
++    val log = new Log(logDir, time.scheduler, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time)
 +    time.sleep(rollMs + 1)
  
      // segment age is less than its limit
      log.append(set)
@@@ -98,7 -96,7 +98,7 @@@
      val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
  
      // create a log
-     val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
 -    val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
      assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
  
      // segments expire in size
@@@ -114,78 -109,29 +114,78 @@@
    @Test
    def testLoadEmptyLog() {
      createEmptyLogs(logDir, 0)
-     val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +    log.append(TestUtils.singleMessageSet("test".getBytes))
    }
  
 +  /**
 +   * This test case appends a bunch of messages and checks that we can read them all back using sequential offsets.
 +   */
    @Test
 -  def testAppendAndRead() {
 -    val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    val message = new Message(Integer.toString(42).getBytes())
 -    for(i <- 0 until 10)
 -      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 -    log.flush()
 -    val messages = log.read(0, 1024)
 -    var current = 0
 -    for(curr <- messages) {
 -      assertEquals("Read message should equal written", message, curr.message)
 -      current += 1
 +  def testAppendAndReadWithSequentialOffsets() {
-     val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +    val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
 +    
 +    for(i <- 0 until messages.length)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
 +    for(i <- 0 until messages.length) {
 +      val read = log.read(i, 100, Some(i+1)).head
 +      assertEquals("Offset read should match order appended.", i, read.offset)
 +      assertEquals("Message should match appended.", messages(i), read.message)
      }
 -    assertEquals(10, current)
 +    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
    }
 -
 +  
 +  /**
 +   * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message
 +   * from any offset less than the logEndOffset including offsets not appended.
 +   */
 +  @Test
 +  def testAppendAndReadWithNonSequentialOffsets() {
-     val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 71, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +    val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
 +    val messages = messageIds.map(id => new Message(id.toString.getBytes))
 +    
 +    // now test the case that we give the offsets and use non-sequential offsets
 +    for(i <- 0 until messages.length)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
 +    for(i <- 50 until messageIds.max) {
 +      val idx = messageIds.indexWhere(_ >= i)
 +      val read = log.read(i, 100, None).head
 +      assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
 +      assertEquals("Message should match appended.", messages(idx), read.message)
 +    }
 +  }
 +  
 +  /**
 +   * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment.
 +   * Specifically we create a log where the last message in the first segment has offset 0. If we
 +   * then read offset 1, we should expect this read to come from the second segment, even though the 
 +   * first segment has the greatest lower bound on the offset.
 +   */
 +  @Test
 +  def testReadAtLogGap() {
-     val log = new Log(logDir, time.scheduler, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 300, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +    
 +    // keep appending until we have two segments with only a single message in the second segment
 +    while(log.numberOfSegments == 1)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) 
 +    
 +    // now manually truncate off all but one message from the first segment to create a gap in the messages
 +    log.logSegments.head.truncateTo(1)
 +    
 +    assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
 +  }
 +  
 +  /**
 +   * Test reading at the boundary of the log, specifically
 +   * - reading from the logEndOffset should give an empty message set
 +   * - reading beyond the log end offset should throw an OffsetOutOfRangeException
 +   */
    @Test
    def testReadOutOfRange() {
      createEmptyLogs(logDir, 1024)
-     val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    val log = new Log(logDir, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 1024, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
      assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
      try {
        log.read(0, 1024)
@@@ -208,10 -151,10 +208,10 @@@
    @Test
    def testLogRolls() {
      /* create a multipart log with 100 messages */
-     val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
      val numMessages = 100
      val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
 -    val offsets = messageSets.map(log.append(_)._1)
 +    val offsets = messageSets.map(log.append(_).firstOffset)
      log.flush
  
      /* do successive reads to ensure all our messages are there */
@@@ -232,8 -173,8 +232,8 @@@
    @Test
    def testCompressedMessages() {
      /* this log should roll after every messageset */
-     val log = new Log(logDir, time.scheduler, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
-     
 -    val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    
++    val log = new Log(logDir, time.scheduler, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++
      /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
      log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
      log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
@@@ -246,46 -187,66 +246,46 @@@
      assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset)
      assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset)
    }
 -
 -  @Test
 -  def testFindSegment() {
 -    assertEquals("Search in empty segments list should find nothing", None, Log.findRange(makeRanges(), 45))
 -    assertEquals("Search in segment list just outside the range of the last segment should find last segment",
 -                 9, Log.findRange(makeRanges(5, 9, 12), 12).get.start)
 -    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
 -                 9, Log.findRange(makeRanges(5, 9, 12), 100).get.start)
 -    assertEquals("Search in segment list far outside the range of the last segment should find last segment",
 -                 None, Log.findRange(makeRanges(5, 9, 12), -1))
 -    assertContains(makeRanges(5, 9, 12), 11)
 -    assertContains(makeRanges(5), 4)
 -    assertContains(makeRanges(5,8), 5)
 -    assertContains(makeRanges(5,8), 6)
 -  }
    
 +  /**
 +   * Test garbage collecting old segments
 +   */
    @Test
 -  def testEdgeLogRollsStartingAtZero() {
 -    // first test a log segment starting at 0
 -    val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    val curOffset = log.logEndOffset
 -    assertEquals(curOffset, 0)
 -
 -    // time goes by; the log file is deleted
 -    log.markDeletedWhile(_ => true)
 -
 -    // we now have a new log; the starting offset of the new log should remain 0
 -    assertEquals(curOffset, log.logEndOffset)
 -    log.delete()
 -  }
 -
 -  @Test
 -  def testEdgeLogRollsStartingAtNonZero() {
 -    // second test an empty log segment starting at non-zero
 -    val log = new Log(logDir, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 -    val numMessages = 1
 -    for(i <- 0 until numMessages)
 -      log.append(TestUtils.singleMessageSet(i.toString.getBytes))
 -    val curOffset = log.logEndOffset
 -    
 -    // time goes by; the log file is deleted
 -    log.markDeletedWhile(_ => true)
 -
 -    // we now have a new log
 -    assertEquals(curOffset, log.logEndOffset)
 -
 -    // time goes by; the log file (which is empty) is deleted again
 -    val deletedSegments = log.markDeletedWhile(_ => true)
 -
 -    // we shouldn't delete the last empty log segment.
 -    assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
 -
 -    // we now have a new log
 -    assertEquals(curOffset, log.logEndOffset)
 +  def testThatGarbageCollectingSegmentsDoesntChangeOffset() {
 +    for(messagesToAppend <- List(0, 1, 25)) {
 +      logDir.mkdirs()
 +      // first test a log segment starting at 0
-       val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
++      val log = new Log(logDir, time.scheduler, 100, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
 +      for(i <- 0 until messagesToAppend)
 +        log.append(TestUtils.singleMessageSet(i.toString.getBytes))
 +      
 +      var currOffset = log.logEndOffset
 +      assertEquals(currOffset, messagesToAppend)
 +
 +      // time goes by; the log file is deleted
 +      log.deleteOldSegments(_ => true)
 +
 +      assertEquals("Deleting segments shouldn't have changed the logEndOffset", currOffset, log.logEndOffset)
 +      assertEquals("We should still have one segment left", 1, log.numberOfSegments)
 +      assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true))
 +      assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
 +      assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", 
 +                   currOffset,
 +                   log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
 +      
 +      // cleanup the log
 +      log.delete()
 +    }
    }
  
 +  /**
 +   * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the 
 +   * setting and checking that an exception is thrown.
 +   */
    @Test
    def testMessageSizeCheck() {
 -    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
 -    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
 +    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes))
 +    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes))
  
      // append messages to log
      val maxMessageSize = second.sizeInBytes - 1
@@@ -311,7 -269,7 +311,7 @@@
      val messageSize = 100
      val segmentSize = 7 * messageSize
      val indexInterval = 3 * messageSize
-     var log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
 -    var log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++    var log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
      for(i <- 0 until numMessages)
        log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
      assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@@ -320,42 -278,17 +320,42 @@@
      log.close()
      
      // test non-recovery case
-     log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
 -    log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++    log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
      assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
 -    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
 -    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
 +    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
 +    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
      log.close()
      
 -    // test 
 -    log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
 +    // test recovery case
-     log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
++    log = new Log(logDir, time.scheduler, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
      assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
 -    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset)
 -    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries)
 +    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
 +    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
 +    log.close()
 +  }
 +  
 +  /**
 +   * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
 +   */
 +  @Test
 +  def testIndexRebuild() {
 +    // publish the messages and close the log
 +    val numMessages = 200
-     var log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
++    var log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
 +    for(i <- 0 until numMessages)
 +      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
 +    val indexFiles = log.logSegments.map(_.index.file)
 +    log.close()
 +    
 +    // delete all the index files
 +    indexFiles.foreach(_.delete())
 +    
 +    // reopen the log
-     log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
++    log = new Log(logDir, time.scheduler, 200, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
 +    
 +    assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
 +    for(i <- 0 until numMessages)
 +      assertEquals(i, log.read(i, 100, None).head.offset)
      log.close()
    }
  
@@@ -370,7 -300,7 +370,7 @@@
      val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
  
      // create a log
-     val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
 -    val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
      assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
  
      for (i<- 1 to msgPerSeg)
@@@ -422,7 -349,7 +422,7 @@@
      val setSize = set.sizeInBytes
      val msgPerSeg = 10
      val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
-     val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
 -    val log = new Log(logDir, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
++    val log = new Log(logDir, time.scheduler, logFileSize, config.messageMaxBytes, 1000, 10000, needsRecovery = false, time = time)
      assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
      for (i<- 1 to msgPerSeg)
        log.append(set)
@@@ -438,7 -365,33 +438,7 @@@
        log.append(set)
      assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
    }
-   
+ 
 -
 -  @Test
 -  def testAppendWithoutOffsetAssignment() {
 -    for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
 -      logDir.mkdir()
 -      var log = new Log(logDir, 
 -                        maxLogFileSize = 64*1024, 
 -                        maxMessageSize = config.messageMaxBytes,
 -                        maxIndexSize = 1000, 
 -                        indexIntervalBytes = 10000, 
 -                        needsRecovery = true)
 -      val messages = List("one", "two", "three", "four", "five", "six")
 -      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
 -                                        offsetCounter = new AtomicLong(5), 
 -                                        messages = messages.map(s => new Message(s.getBytes)):_*)
 -      val firstOffset = ms.shallowIterator.toList.head.offset
 -      val lastOffset = ms.shallowIterator.toList.last.offset
 -      val (first, last) = log.append(ms, assignOffsets = false)
 -      assertEquals(last + 1, log.logEndOffset)
 -      assertEquals(firstOffset, first)
 -      assertEquals(lastOffset, last)
 -      assertTrue(log.read(5, 64*1024).size > 0)
 -      log.delete()
 -    }
 -  }
 -  
    /**
     * When we open a log any index segments without an associated log segment should be deleted.
     */
@@@ -449,10 -402,9 +449,10 @@@
      
      val set = TestUtils.singleMessageSet("test".getBytes())
      val log = new Log(logDir, 
 -                      maxLogFileSize = set.sizeInBytes * 5, 
 +                      time.scheduler,
 +                      maxSegmentSize = set.sizeInBytes * 5, 
-                       maxMessageSize = config.maxMessageSize,
-                       maxIndexSize = 1000, 
+                       maxMessageSize = config.messageMaxBytes,
 -                      maxIndexSize = 1000, 
++                      maxIndexSize = 1000,
                        indexIntervalBytes = 1, 
                        needsRecovery = false)
      
@@@ -475,10 -424,9 +475,10 @@@
  
      // create a log
      var log = new Log(logDir, 
 -                      maxLogFileSize = set.sizeInBytes * 5, 
 +                      time.scheduler,
 +                      maxSegmentSize = set.sizeInBytes * 5, 
-                       maxMessageSize = config.maxMessageSize, 
-                       maxIndexSize = 1000, 
+                       maxMessageSize = config.messageMaxBytes,
 -                      maxIndexSize = 1000, 
++                      maxIndexSize = 1000,
                        indexIntervalBytes = 10000, 
                        needsRecovery = true)
      
@@@ -487,10 -435,9 +487,10 @@@
        log.append(set)
      log.close()
      log = new Log(logDir, 
 -                  maxLogFileSize = set.sizeInBytes * 5, 
 +                  time.scheduler,
 +                  maxSegmentSize = set.sizeInBytes * 5, 
-                   maxMessageSize = config.maxMessageSize, 
-                   maxIndexSize = 1000, 
+                   maxMessageSize = config.messageMaxBytes,
 -                  maxIndexSize = 1000, 
++                  maxIndexSize = 1000,
                    indexIntervalBytes = 10000, 
                    needsRecovery = true)
      log.truncateTo(3)
@@@ -498,68 -445,24 +498,68 @@@
      assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
    }
    
 -  def assertContains(ranges: Array[Range], offset: Long) = {
 -    Log.findRange(ranges, offset) match {
 -      case Some(range) => 
 -        assertTrue(range + " does not contain " + offset, range.contains(offset))
 -      case None => fail("No range found, but expected to find " + offset)
 -    }
 +  /**
 +   * Test that deleted files are deleted after the appropriate time.
 +   */
 +  @Test
 +  def testAsyncDelete() {
 +    val set = TestUtils.singleMessageSet("test".getBytes())
 +    val asyncDeleteMs = 1000
 +    val log = new Log(logDir, 
 +                      time.scheduler,
 +                      maxSegmentSize = set.sizeInBytes * 5, 
-                       maxMessageSize = config.maxMessageSize, 
++                      maxMessageSize = config.messageMaxBytes,
 +                      maxIndexSize = 1000, 
 +                      indexIntervalBytes = 10000, 
 +                      segmentDeleteDelayMs = asyncDeleteMs,
 +                      needsRecovery = true)
 +    
 +    // append some messages to create some segments
 +    for(i <- 0 until 100)
 +      log.append(set)
 +    
 +    // files should be renamed
 +    val segments = log.logSegments.toArray
 +    log.deleteOldSegments((s) => true)
 +    
 +    assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
 +    val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
 +    assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
 +    
 +    // when enough time passes the files should be deleted
 +    time.sleep(asyncDeleteMs + 1)
 +    assertTrue("Files should all be gone.", renamed.forall(!_.exists))
    }
    
 -  class SimpleRange(val start: Long, val size: Long) extends Range
 -  
 -  def makeRanges(breaks: Int*): Array[Range] = {
 -    val list = new ArrayList[Range]
 -    var prior = 0
 -    for(brk <- breaks) {
 -      list.add(new SimpleRange(prior, brk - prior))
 -      prior = brk
 -    }
 -    list.toArray(new Array[Range](list.size))
 +  /**
 +   * Any files ending in .deleted should be removed when the log is re-opened.
 +   */
 +  @Test
 +  def testOpenDeletesObsoleteFiles() {
 +    val set = TestUtils.singleMessageSet("test".getBytes())
 +    var log = new Log(logDir, 
 +                      time.scheduler,
 +                      maxSegmentSize = set.sizeInBytes * 5, 
-                       maxMessageSize = config.maxMessageSize, 
++                      maxMessageSize = config.messageMaxBytes,
 +                      maxIndexSize = 1000, 
 +                      indexIntervalBytes = 10000, 
 +                      needsRecovery = false)
 +    
 +    // append some messages to create some segments
 +    for(i <- 0 until 100)
 +      log.append(set)
 +    
 +    log.deleteOldSegments((s) => true)
 +    log.close()
 +    
 +    log = new Log(logDir, 
 +                  time.scheduler,
 +                  maxSegmentSize = set.sizeInBytes * 5, 
-                   maxMessageSize = config.maxMessageSize, 
++                  maxMessageSize = config.messageMaxBytes,
 +                  maxIndexSize = 1000, 
 +                  indexIntervalBytes = 10000,
 +                  needsRecovery = false)
 +    assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
    }
    
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index f69b379,0000000..e3752cb
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@@ -1,219 -1,0 +1,219 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.server
 +
 +import java.io.File
 +import kafka.utils._
 +import junit.framework.Assert._
 +import java.util.{Random, Properties}
 +import kafka.consumer.SimpleConsumer
 +import org.junit.{After, Before, Test}
 +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 +import kafka.zk.ZooKeeperTestHarness
 +import org.scalatest.junit.JUnit3Suite
 +import kafka.admin.CreateTopicCommand
 +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 +import kafka.utils.TestUtils._
 +import kafka.common.{ErrorMapping, TopicAndPartition}
 +import kafka.utils.nonthreadsafe
 +import kafka.utils.threadsafe
 +import org.junit.After
 +import org.junit.Before
 +import org.junit.Test
 +
 +class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 +  val random = new Random() 
 +  var logDir: File = null
 +  var topicLogDir: File = null
 +  var server: KafkaServer = null
 +  var logSize: Int = 100
 +  val brokerPort: Int = 9099
 +  var simpleConsumer: SimpleConsumer = null
 +  var time: Time = new MockTime()
 +
 +  @Before
 +  override def setUp() {
 +    super.setUp()
 +    val config: Properties = createBrokerConfig(1, brokerPort)
 +    val logDirPath = config.getProperty("log.dir")
 +    logDir = new File(logDirPath)
 +    time = new MockTime()
 +    server = TestUtils.createServer(new KafkaConfig(config), time)
 +    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
 +  }
 +
 +  @After
 +  override def tearDown() {
 +    simpleConsumer.close
 +    server.shutdown
 +    Utils.rm(logDir)
 +    super.tearDown()
 +  }
 +
 +  @Test
 +  def testGetOffsetsForUnknownTopic() {
 +    val topicAndPartition = TopicAndPartition("foo", 0)
 +    val request = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
 +    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
 +    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
 +                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeLatestTime() {
 +    val topicPartition = "kafka-" + 0
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
 +      replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +
 +    // try to fetch using latest offset
 +    val fetchResponse = simpleConsumer.fetch(
 +      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
 +    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
 +  }
 +
 +  @Test
 +  def testEmptyLogsGetOffsets() {
 +    val topicPartition = "kafka-" + random.nextInt(10)
 +    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
 +    topicLogDir = new File(topicPartitionPath)
 +    topicLogDir.mkdir
 +
 +    val topic = topicPartition.split("-").head
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 +    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 +
 +    var offsetChanged = false
 +    for(i <- 1 to 14) {
 +      val topicAndPartition = TopicAndPartition(topic, 0)
 +      val offsetRequest =
 +        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
 +      val consumerOffsets =
 +        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +
 +      if(consumerOffsets(0) == 1) {
 +        offsetChanged = true
 +      }
 +    }
 +    assertFalse(offsetChanged)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeNow() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeEarliestTime() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10)
 +
 +    assertEquals(Seq(0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest =
 +      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(0L), consumerOffsets)
 +  }
 +
 +  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
 +    val props = new Properties
-     props.put("brokerid", nodeId.toString)
++    props.put("broker.id", nodeId.toString)
 +    props.put("port", port.toString)
 +    props.put("log.dir", getLogDir.getAbsolutePath)
-     props.put("log.flush.interval", "1")
++    props.put("log.flush.interval.messages", "1")
 +    props.put("enable.zookeeper", "false")
 +    props.put("num.partitions", "20")
 +    props.put("log.retention.hours", "10")
 +    props.put("log.cleanup.interval.mins", "5")
-     props.put("log.file.size", logSize.toString)
++    props.put("log.segment.bytes", logSize.toString)
 +    props.put("zk.connect", zkConnect.toString)
 +    props
 +  }
 +
 +  private def getLogDir(): File = {
 +    val dir = TestUtils.tempDir()
 +    dir
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d46d47e,9400328..5547d63
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@@ -123,10 -123,11 +123,10 @@@ object TestUtils extends Logging 
     */
    def createBrokerConfig(nodeId: Int, port: Int): Properties = {
      val props = new Properties
-     props.put("brokerid", nodeId.toString)
-     props.put("hostname", "localhost")
+     props.put("broker.id", nodeId.toString)
+     props.put("host.name", "localhost")
      props.put("port", port.toString)
      props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
 -    props.put("log.flush.interval.messages", "1")
      props.put("zk.connect", TestZKUtils.zookeeperConnect)
      props.put("replica.socket.timeout.ms", "1500")
      props

http://git-wip-us.apache.org/repos/asf/kafka/blob/9249b76d/examples/src/main/java/kafka/examples/Consumer.java
----------------------------------------------------------------------