You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:11 UTC

[1/30] git commit: Merge branch '0.8' into trunk

Updated Branches:
  refs/heads/trunk 739b2df68 -> 32dae955d


Merge branch '0.8' into trunk

Conflicts:
	core/src/main/scala/kafka/log/Log.scala
	core/src/test/scala/unit/kafka/admin/AdminTest.scala
	core/src/test/scala/unit/kafka/log/LogTest.scala
	core/src/test/scala/unit/kafka/server/LogOffsetTest.scala


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32dae955
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32dae955
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32dae955

Branch: refs/heads/trunk
Commit: 32dae955d5e2e2dd45bddb628cb07c874241d856
Parents: 739b2df 85ec044
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Dec 18 09:43:41 2012 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Dec 18 09:43:41 2012 -0800

----------------------------------------------------------------------
 bin/kafka-producer-shell.sh                        |   17 --
 bin/windows/kafka-server-stop.bat                  |   18 ++
 bin/windows/zookeeper-server-stop.bat              |   17 ++
 .../src/main/java/kafka/etl/KafkaETLContext.java   |    2 +-
 core/src/main/scala/kafka/admin/AdminUtils.scala   |    5 +-
 .../scala/kafka/admin/CreateTopicCommand.scala     |    1 +
 core/src/main/scala/kafka/api/FetchRequest.scala   |    4 +-
 core/src/main/scala/kafka/api/FetchResponse.scala  |   19 +-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   10 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala     |   10 +-
 core/src/main/scala/kafka/api/OffsetRequest.scala  |    8 +-
 core/src/main/scala/kafka/api/OffsetResponse.scala |   10 +-
 .../src/main/scala/kafka/api/ProducerRequest.scala |    8 +-
 .../main/scala/kafka/api/ProducerResponse.scala    |    9 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   10 +-
 .../main/scala/kafka/api/StopReplicaResponse.scala |   10 +-
 core/src/main/scala/kafka/api/TopicMetadata.scala  |  120 +++-------
 .../scala/kafka/api/TopicMetadataRequest.scala     |   13 +-
 .../scala/kafka/api/TopicMetadataResponse.scala    |   35 ++-
 core/src/main/scala/kafka/client/ClientUtils.scala |   32 ++-
 core/src/main/scala/kafka/cluster/Broker.scala     |   20 +-
 .../scala/kafka/common/ClientIdAndBroker.scala     |   26 ++
 .../main/scala/kafka/common/ClientIdAndTopic.scala |   27 ++
 core/src/main/scala/kafka/common/Config.scala      |   40 +++
 core/src/main/scala/kafka/common/Topic.scala       |   40 +++
 .../scala/kafka/common/TopicExistsException.scala  |   22 ++
 .../scala/kafka/consumer/ConsoleConsumer.scala     |   27 ++-
 .../main/scala/kafka/consumer/ConsumerConfig.scala |   33 +++-
 .../kafka/consumer/ConsumerFetcherManager.scala    |    2 +-
 .../scala/kafka/consumer/ConsumerIterator.scala    |    8 +-
 .../scala/kafka/consumer/ConsumerTopicStat.scala   |   40 ---
 .../scala/kafka/consumer/ConsumerTopicStats.scala  |   57 +++++
 .../consumer/FetchRequestAndResponseStats.scala    |   58 +++++
 .../main/scala/kafka/consumer/KafkaStream.scala    |    5 +-
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |    9 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   44 ++--
 .../consumer/ZookeeperConsumerConnector.scala      |   23 +-
 .../scala/kafka/controller/KafkaController.scala   |    2 +-
 .../kafka/controller/PartitionLeaderSelector.scala |    6 +-
 .../kafka/controller/PartitionStateMachine.scala   |    4 +-
 .../kafka/controller/ReplicaStateMachine.scala     |    2 +-
 .../scala/kafka/javaapi/TopicMetadataRequest.scala |    7 +-
 .../kafka/javaapi/consumer/SimpleConsumer.scala    |    6 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    1 -
 core/src/main/scala/kafka/log/Log.scala            |   13 +-
 core/src/main/scala/kafka/log/LogManager.scala     |    2 +-
 core/src/main/scala/kafka/message/Message.scala    |   66 +++---
 .../kafka/metrics/KafkaCSVMetricsReporter.scala    |    1 -
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    8 +-
 .../scala/kafka/producer/ConsoleProducer.scala     |   67 ++++-
 .../scala/kafka/producer/DefaultPartitioner.scala  |    1 -
 core/src/main/scala/kafka/producer/Producer.scala  |   50 +---
 .../main/scala/kafka/producer/ProducerConfig.scala |   31 +++-
 .../main/scala/kafka/producer/ProducerPool.scala   |   18 +-
 .../kafka/producer/ProducerRequestStats.scala      |   56 +++++
 .../main/scala/kafka/producer/ProducerStats.scala  |   40 +++
 .../scala/kafka/producer/ProducerTopicStats.scala  |   57 +++++
 .../main/scala/kafka/producer/SyncProducer.scala   |   24 +-
 .../scala/kafka/producer/SyncProducerConfig.scala  |    8 +-
 .../kafka/producer/async/AsyncProducerStats.scala  |   25 --
 .../kafka/producer/async/DefaultEventHandler.scala |   65 +++---
 .../kafka/producer/async/ProducerSendThread.scala  |   11 +-
 core/src/main/scala/kafka/serializer/Decoder.scala |    8 -
 core/src/main/scala/kafka/serializer/Encoder.scala |    2 -
 .../scala/kafka/server/AbstractFetcherThread.scala |   52 +++--
 .../kafka/server/HighwaterMarkCheckpoint.scala     |    2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   81 ++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |    8 +-
 .../scala/kafka/server/KafkaRequestHandler.scala   |    8 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    8 +-
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |    3 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 .../main/scala/kafka/server/RequestPurgatory.scala |   76 +++---
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |    2 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   16 +-
 .../main/scala/kafka/tools/GetOffsetShell.scala    |    2 +-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |    1 -
 .../src/main/scala/kafka/tools/ProducerShell.scala |   71 ------
 .../main/scala/kafka/tools/ReplayLogProducer.scala |    2 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |   10 +-
 .../main/scala/kafka/tools/UpdateOffsetsInZK.scala |    2 +-
 core/src/main/scala/kafka/utils/Topic.scala        |   41 ---
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |    6 +-
 .../test/scala/other/kafka/TestKafkaAppender.scala |    1 -
 .../scala/other/kafka/TestZKConsumerOffsets.scala  |    1 -
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   55 ++--
 .../api/RequestResponseSerializationTest.scala     |   21 +-
 .../test/scala/unit/kafka/common/ConfigTest.scala  |   89 +++++++
 .../test/scala/unit/kafka/common/TopicTest.scala   |   61 +++++
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |   10 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |    1 -
 .../scala/unit/kafka/integration/FetcherTest.scala |   10 +-
 .../kafka/integration/LazyInitProducerTest.scala   |    2 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |    1 -
 .../integration/ProducerConsumerTestHarness.scala  |    6 +-
 .../unit/kafka/integration/TopicMetadataTest.scala |    2 +-
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    2 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |   27 ++
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |    3 +-
 .../unit/kafka/network/SocketServerTest.scala      |    2 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    |   58 +++---
 .../scala/unit/kafka/producer/ProducerTest.scala   |    5 +-
 .../unit/kafka/producer/SyncProducerTest.scala     |    8 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   10 +-
 .../scala/unit/kafka/server/LogRecoveryTest.scala  |    2 -
 .../unit/kafka/server/ServerShutdownTest.scala     |    7 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   37 ++--
 .../test/scala/unit/kafka/utils/TopicTest.scala    |   61 -----
 .../java/kafka/examples/SimpleConsumerDemo.java    |    3 +-
 .../scala/kafka/perf/ProducerPerformance.scala     |   19 +-
 .../kafka/perf/SimpleConsumerPerformance.scala     |    2 +-
 project/build/KafkaProject.scala                   |    2 +-
 .../migration_tool_test.py                         |   14 +-
 .../mirror_maker_testsuite/mirror_maker_test.py    |   17 +-
 .../replication_testsuite/replica_basic_test.py    |   35 ++--
 .../testcase_9051/cluster_config.json              |   63 +-----
 .../testcase_9051/testcase_9051_properties.json    |   56 +----
 system_test/testcase_to_run.json                   |    2 +-
 system_test/utils/kafka_system_test_utils.py       |  191 +++++++++++++--
 system_test/utils/testcase_env.py                  |    5 +
 123 files changed, 1634 insertions(+), 1078 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/Log.scala
index d107d71,66c07af..1654dbf
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@@ -93,26 -143,25 +93,35 @@@ class Log(val dir: File
      val ls = dir.listFiles()
      if(ls != null) {
        for(file <- ls if file.isFile) {
 -        val filename = file.getName()
 -        if(!file.canRead) {
++        if(!file.canRead)
+           throw new IOException("Could not read file " + file)
 -        } else if(filename.endsWith(IndexFileSuffix)) {
 -          // ensure that we have a corresponding log file for this index file
 -          val log = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
 -          if(!log.exists) {
 +        val filename = file.getName
 +        if(filename.endsWith(DeletedFileSuffix)) {
++          // if the file ends in .deleted, delete it
 +          val deleted = file.delete()
 +          if(!deleted)
 +            warn("Attempt to delete defunct segment file %s failed.".format(filename))
++        }  else if(filename.endsWith(IndexFileSuffix)) {
++          // if it is an index file, make sure it has a corresponding .log file
++          val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
++          if(!logFile.exists) {
+             warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
+             file.delete()
+           }
          } else if(filename.endsWith(LogFileSuffix)) {
-           if(!file.canRead)
-             throw new IOException("Could not read file " + file)
 -          val offset = filename.substring(0, filename.length - LogFileSuffix.length).toLong
 -          // TODO: we should ideally rebuild any missing index files, instead of erroring out
 -          if(!Log.indexFilename(dir, offset).exists)
 -            throw new IllegalStateException("Found log file with no corresponding index file.")
 -          logSegments.add(new LogSegment(dir = dir, 
 -                                         startOffset = offset,
 -                                         indexIntervalBytes = indexIntervalBytes, 
 -                                         maxIndexSize = maxIndexSize))
++          // if its a log file, load the corresponding log segment
 +          val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
 +          val hasIndex = Log.indexFilename(dir, start).exists
 +          val segment = new LogSegment(dir = dir, 
 +                                       startOffset = start,
 +                                       indexIntervalBytes = indexIntervalBytes, 
 +                                       maxIndexSize = maxIndexSize)
 +          if(!hasIndex) {
 +            // this can only happen if someone manually deletes the index file
 +            error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
 +            segment.recover(maxMessageSize)
 +          }
 +          logSegments.put(start, segment)
          }
        }
      }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/producer/ProducerConfig.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/producer/ProducerConfig.scala
index 72f68dc,e559187..6c85577
--- a/core/src/main/scala/kafka/producer/ProducerConfig.scala
+++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala
@@@ -81,8 -111,7 +108,10 @@@ class ProducerConfig private (val props
     */
    val producerRetries = props.getInt("producer.num.retries", 3)
  
 +  /**
 +   * The amount of time to wait in between retries
 +   */
    val producerRetryBackoffMs = props.getInt("producer.retry.backoff.ms", 100)
+ 
+   validate(this)
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/server/KafkaApis.scala
index eff627c,5a85b04..ef3b66e
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@@ -244,12 -245,7 +246,12 @@@ class KafkaApis(val requestChannel: Req
        try {
          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
          val log = localReplica.log.get
 -        val (start, end) = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
 +        val info = log.append(messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true)
 +        
 +        // update stats
-         BrokerTopicStat.getBrokerTopicStat(topicAndPartition.topic).messagesInRate.mark(info.count)
-         BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(info.count)
++        BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(info.count)
++        BrokerTopicStats.getBrokerAllTopicStats.messagesInRate.mark(info.count)
 +        
          // we may need to increment high watermark since ISR could be down to 1
          localReplica.partition.maybeIncrementLeaderHW(localReplica)
          trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
@@@ -418,62 -410,9 +419,62 @@@
            (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
        }
      })
-     val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
+     val response = OffsetResponse(offsetRequest.correlationId, responseMap)
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    }
 +  
 +  def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
 +    logManager.getLog(topicAndPartition.topic, topicAndPartition.partition) match {
 +      case Some(log) => 
 +        fetchOffsetsBefore(log, timestamp, maxNumOffsets)
 +      case None => 
 +        if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
 +          Seq(0L)
 +        else
 +          Nil
 +    }
 +  }
 +  
 +  def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
 +    val segsArray = log.logSegments.toArray
 +    var offsetTimeArray: Array[(Long, Long)] = null
 +    if(segsArray.last.size > 0)
 +      offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
 +    else
 +      offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 +
 +    for(i <- 0 until segsArray.length)
 +      offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified)
 +    if(segsArray.last.size > 0)
 +      offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds)
 +
 +    var startIndex = -1
 +    timestamp match {
 +      case OffsetRequest.LatestTime =>
 +        startIndex = offsetTimeArray.length - 1
 +      case OffsetRequest.EarliestTime =>
 +        startIndex = 0
 +      case _ =>
 +        var isFound = false
 +        debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
 +        startIndex = offsetTimeArray.length - 1
 +        while (startIndex >= 0 && !isFound) {
 +          if (offsetTimeArray(startIndex)._2 <= timestamp)
 +            isFound = true
 +          else
 +            startIndex -=1
 +        }
 +    }
 +
 +    val retSize = maxNumOffsets.min(startIndex + 1)
 +    val ret = new Array[Long](retSize)
 +    for(j <- 0 until retSize) {
 +      ret(j) = offsetTimeArray(startIndex)._1
 +      startIndex -= 1
 +    }
 +    // ensure that the returned seq is in descending order of offsets
 +    ret.toSeq.sortBy(- _)
 +  }
  
    /**
     * Service the topic metadata request API

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7e17da4,7a31b51..3b61d2a
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@@ -374,33 -374,38 +374,34 @@@ class AdminTest extends JUnit3Suite wit
      var controllerId = ZkUtils.getController(zkClient)
      var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
      var partitionsRemaining = controller.shutdownBroker(2)
-     assertEquals(0, partitionsRemaining)
-     var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-     var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-     assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-     assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
- 
-     leaderBeforeShutdown = leaderAfterShutdown
-     controllerId = ZkUtils.getController(zkClient)
-     controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-     partitionsRemaining = controller.shutdownBroker(1)
-     assertEquals(0, partitionsRemaining)
-     topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-     leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-     assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-     assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+     try {
+       assertEquals(0, partitionsRemaining)
+       var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+       var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+       assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
 -      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
 -      assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
  
-     leaderBeforeShutdown = leaderAfterShutdown
-     controllerId = ZkUtils.getController(zkClient)
-     controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-     partitionsRemaining = controller.shutdownBroker(0)
-     assertEquals(1, partitionsRemaining)
-     topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-     leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-     assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-     assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+       leaderBeforeShutdown = leaderAfterShutdown
+       controllerId = ZkUtils.getController(zkClient)
+       controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+       partitionsRemaining = controller.shutdownBroker(1)
+       assertEquals(0, partitionsRemaining)
+       topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+       leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+       assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
 -      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
+       assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
  
-     servers.foreach(_.shutdown())
+       leaderBeforeShutdown = leaderAfterShutdown
+       controllerId = ZkUtils.getController(zkClient)
+       controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+       partitionsRemaining = controller.shutdownBroker(0)
+       assertEquals(1, partitionsRemaining)
+       topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+       leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
+       assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
+       assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
 -    }
 -    finally {
++    } finally {
+       servers.foreach(_.shutdown())
+     }
    }
  
    private def checkIfReassignPartitionPathExists(): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/log/LogTest.scala
index fbd738a,900d0e2..109474c
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@@ -438,10 -365,59 +438,37 @@@ class LogTest extends JUnitSuite 
        log.append(set)
      assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
    }
 -
 -
 -  @Test
 -  def testAppendWithoutOffsetAssignment() {
 -    for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) {
 -      logDir.mkdir()
 -      var log = new Log(logDir, 
 -                        maxLogFileSize = 64*1024, 
 -                        maxMessageSize = config.maxMessageSize, 
 -                        maxIndexSize = 1000, 
 -                        indexIntervalBytes = 10000, 
 -                        needsRecovery = true)
 -      val messages = List("one", "two", "three", "four", "five", "six")
 -      val ms = new ByteBufferMessageSet(compressionCodec = codec, 
 -                                        offsetCounter = new AtomicLong(5), 
 -                                        messages = messages.map(s => new Message(s.getBytes)):_*)
 -      val firstOffset = ms.shallowIterator.toList.head.offset
 -      val lastOffset = ms.shallowIterator.toList.last.offset
 -      val (first, last) = log.append(ms, assignOffsets = false)
 -      assertEquals(last + 1, log.logEndOffset)
 -      assertEquals(firstOffset, first)
 -      assertEquals(lastOffset, last)
 -      assertTrue(log.read(5, 64*1024).size > 0)
 -      log.delete()
 -    }
 -  }
+   
+   /**
+    * When we open a log any index segments without an associated log segment should be deleted.
+    */
+   @Test
+   def testBogusIndexSegmentsAreRemoved() {
+     val bogusIndex1 = Log.indexFilename(logDir, 0)
+     val bogusIndex2 = Log.indexFilename(logDir, 5)
+     
+     val set = TestUtils.singleMessageSet("test".getBytes())
+     val log = new Log(logDir, 
 -                      maxLogFileSize = set.sizeInBytes * 5, 
++                      time.scheduler,
++                      maxSegmentSize = set.sizeInBytes * 5, 
+                       maxMessageSize = config.maxMessageSize,
+                       maxIndexSize = 1000, 
+                       indexIntervalBytes = 1, 
+                       needsRecovery = false)
+     
+     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
+     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
+     
+     // check that we can append to the log
+     for(i <- 0 until 10)
+       log.append(set)
+       
+     log.delete()
+   }
  
 +  /**
 +   * Verify that truncation works correctly after re-opening the log
 +   */
    @Test
    def testReopenThenTruncate() {
      val set = TestUtils.singleMessageSet("test".getBytes())

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index aa58dce,0000000..f69b379
mode 100644,000000..100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@@ -1,219 -1,0 +1,219 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.server
 +
 +import java.io.File
 +import kafka.utils._
 +import junit.framework.Assert._
 +import java.util.{Random, Properties}
 +import kafka.consumer.SimpleConsumer
 +import org.junit.{After, Before, Test}
 +import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 +import kafka.zk.ZooKeeperTestHarness
 +import org.scalatest.junit.JUnit3Suite
 +import kafka.admin.CreateTopicCommand
 +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 +import kafka.utils.TestUtils._
 +import kafka.common.{ErrorMapping, TopicAndPartition}
 +import kafka.utils.nonthreadsafe
 +import kafka.utils.threadsafe
 +import org.junit.After
 +import org.junit.Before
 +import org.junit.Test
 +
 +class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
 +  val random = new Random() 
 +  var logDir: File = null
 +  var topicLogDir: File = null
 +  var server: KafkaServer = null
 +  var logSize: Int = 100
 +  val brokerPort: Int = 9099
 +  var simpleConsumer: SimpleConsumer = null
 +  var time: Time = new MockTime()
 +
 +  @Before
 +  override def setUp() {
 +    super.setUp()
 +    val config: Properties = createBrokerConfig(1, brokerPort)
 +    val logDirPath = config.getProperty("log.dir")
 +    logDir = new File(logDirPath)
 +    time = new MockTime()
 +    server = TestUtils.createServer(new KafkaConfig(config), time)
-     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
++    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
 +  }
 +
 +  @After
 +  override def tearDown() {
 +    simpleConsumer.close
 +    server.shutdown
 +    Utils.rm(logDir)
 +    super.tearDown()
 +  }
 +
 +  @Test
 +  def testGetOffsetsForUnknownTopic() {
 +    val topicAndPartition = TopicAndPartition("foo", 0)
 +    val request = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
 +    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
 +    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
 +                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeLatestTime() {
 +    val topicPartition = "kafka-" + 0
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime, 10)
-     assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
++    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(
 +      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
 +      replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-     assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
++    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +
 +    // try to fetch using latest offset
 +    val fetchResponse = simpleConsumer.fetch(
 +      new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build())
 +    assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext)
 +  }
 +
 +  @Test
 +  def testEmptyLogsGetOffsets() {
 +    val topicPartition = "kafka-" + random.nextInt(10)
 +    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
 +    topicLogDir = new File(topicPartitionPath)
 +    topicLogDir.mkdir
 +
 +    val topic = topicPartition.split("-").head
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 +    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 +
 +    var offsetChanged = false
 +    for(i <- 1 to 14) {
 +      val topicAndPartition = TopicAndPartition(topic, 0)
 +      val offsetRequest =
 +        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
 +      val consumerOffsets =
 +        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +
 +      if(consumerOffsets(0) == 1) {
 +        offsetChanged = true
 +      }
 +    }
 +    assertFalse(offsetChanged)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeNow() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now, 10)
-     assertEquals(Seq(20L, 15L, 10L, 5L, 0L), offsets)
++    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
-     assertEquals(Seq(20L, 15L, 10L, 5L, 0L), consumerOffsets)
++    assertEquals(Seq(20L, 16L, 12L, 8L, 4L, 0L), consumerOffsets)
 +  }
 +
 +  @Test
 +  def testGetOffsetsBeforeEarliestTime() {
 +    val topicPartition = "kafka-" + random.nextInt(3)
 +    val topic = topicPartition.split("-").head
 +    val part = Integer.valueOf(topicPartition.split("-").last).intValue
 +
 +    // setup brokers in zookeeper as owners of partitions for this test
 +    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 +
 +    val logManager = server.getLogManager
 +    val log = logManager.getOrCreateLog(topic, part)
 +    val message = new Message(Integer.toString(42).getBytes())
 +    for(i <- 0 until 20)
 +      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
 +    log.flush()
 +
 +    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime, 10)
 +
 +    assertEquals(Seq(0L), offsets)
 +
 +    waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
 +    val topicAndPartition = TopicAndPartition(topic, part)
 +    val offsetRequest =
 +      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
 +    val consumerOffsets =
 +      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 +    assertEquals(Seq(0L), consumerOffsets)
 +  }
 +
 +  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
 +    val props = new Properties
 +    props.put("brokerid", nodeId.toString)
 +    props.put("port", port.toString)
 +    props.put("log.dir", getLogDir.getAbsolutePath)
 +    props.put("log.flush.interval", "1")
 +    props.put("enable.zookeeper", "false")
 +    props.put("num.partitions", "20")
 +    props.put("log.retention.hours", "10")
 +    props.put("log.cleanup.interval.mins", "5")
 +    props.put("log.file.size", logSize.toString)
 +    props.put("zk.connect", zkConnect.toString)
 +    props
 +  }
 +
 +  private def getLogDir(): File = {
 +    val dir = TestUtils.tempDir()
 +    dir
 +  }
 +
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/32dae955/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------