You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[28/28] git commit: merge 0.8 to trunk and resolve conflicts

Updated Branches:
  refs/heads/trunk 999813821 -> 92f177b30


merge 0.8 to trunk and resolve conflicts


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92f177b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92f177b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92f177b3

Branch: refs/heads/trunk
Commit: 92f177b309834a9b572bbab33e8f6198316bd156
Parents: 9998138 40a80fa
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 25 22:03:39 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 25 22:03:39 2013 -0800

----------------------------------------------------------------------
 .gitignore                                         |    2 +
 README.md                                          |   31 ++--
 bin/kafka-run-class.sh                             |   41 +++--
 bin/run-rat.sh                                     |    2 +-
 contrib/hadoop-consumer/build.sbt                  |    1 +
 contrib/hadoop-producer/build.sbt                  |    1 +
 core/build.sbt                                     |   27 +++
 core/src/main/scala/kafka/api/FetchRequest.scala   |   26 +++-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   26 +++-
 .../main/scala/kafka/api/OffsetCommitRequest.scala |   13 +-
 .../main/scala/kafka/api/OffsetFetchRequest.scala  |   16 ++-
 core/src/main/scala/kafka/api/OffsetRequest.scala  |   24 +++-
 .../src/main/scala/kafka/api/ProducerRequest.scala |   24 +++-
 .../main/scala/kafka/api/RequestOrResponse.scala   |    8 +-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   27 +++-
 .../scala/kafka/api/TopicMetadataRequest.scala     |   23 +++-
 core/src/main/scala/kafka/client/ClientUtils.scala |    3 +-
 core/src/main/scala/kafka/cluster/Partition.scala  |    4 +-
 .../scala/kafka/consumer/ConsoleConsumer.scala     |    6 +
 .../kafka/consumer/ConsumerFetcherManager.scala    |    5 +-
 .../consumer/ZookeeperConsumerConnector.scala      |    5 +-
 .../scala/kafka/controller/KafkaController.scala   |   61 +++---
 .../kafka/controller/PartitionLeaderSelector.scala |   47 +++---
 .../kafka/controller/PartitionStateMachine.scala   |   26 ++--
 .../kafka/controller/ReplicaStateMachine.scala     |   57 +++---
 core/src/main/scala/kafka/log/LogSegment.scala     |    5 +-
 .../main/scala/kafka/network/RequestChannel.scala  |   14 +-
 .../main/scala/kafka/network/SocketServer.scala    |    5 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    7 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |   24 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  100 +----------
 .../scala/kafka/server/ReplicaFetcherManager.scala |    2 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    2 +-
 .../main/scala/kafka/tools/KafkaMigrationTool.java |    2 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    2 +-
 core/src/main/scala/kafka/utils/Logging.scala      |    2 +-
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   11 +-
 .../src/test/scala/other/kafka/StressTestLog.scala |  112 +++++++++++
 .../test/scala/unit/kafka/utils/UtilsTest.scala    |   12 ++
 examples/build.sbt                                 |    3 +
 lib/apache-rat-0.8.jar                             |  Bin 0 -> 1165578 bytes
 lib/sbt-launch.jar                                 |  Bin 920993 -> 1103618 bytes
 perf/build.sbt                                     |    1 +
 project/Build.scala                                |  143 +++++++++++++++
 project/build.properties                           |    9 +-
 project/plugins.sbt                                |    5 +
 project/plugins/Plugins.scala                      |   23 ---
 .../0.7/bin/kafka-run-class.sh                     |   15 ++
 .../testcase_0011/testcase_0011_properties.json    |   84 +++++++++
 .../testcase_0024/testcase_0024_properties.json    |   86 +++++++++
 .../testcase_0119/testcase_0119_properties.json    |   87 +++++++++
 .../testcase_0128/testcase_0128_properties.json    |   89 +++++++++
 .../testcase_0134/testcase_0134_properties.json    |   92 +++++++++
 .../testcase_0159/testcase_0159_properties.json    |   91 +++++++++
 .../testcase_0209/testcase_0209_properties.json    |   90 +++++++++
 .../testcase_0259/testcase_0259_properties.json    |   90 +++++++++
 .../testcase_0309/testcase_0309_properties.json    |   88 +++++++++
 58 files changed, 1490 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 0a34d5d,0000000..1ca37e2
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@@ -1,100 -1,0 +1,109 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
- import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
 +import kafka.utils.Logging
- 
++import kafka.network.{RequestChannel, BoundedByteBufferSend}
++import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
++import kafka.network.RequestChannel.Response
 +object OffsetCommitRequest extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
 +    // Read values from the envelope
 +    val versionId = buffer.getShort
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetRequest 
 +    val consumerGroupId = readShortString(buffer)
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        val offset = buffer.getLong
 +        val metadata = readShortString(buffer)
 +        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
 +      })
 +    })
 +    OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetCommitRequest(groupId: String,
 +                               requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
 +                               versionId: Short = OffsetCommitRequest.CurrentVersion,
 +                               correlationId: Int = 0,
 +                               clientId: String = OffsetCommitRequest.DefaultClientId)
 +    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 +  
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putShort(versionId)
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetCommitRequest
 +    writeShortString(buffer, groupId)             // consumer group
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => {
 +        buffer.putInt(t2._1.partition)  // partition
 +        buffer.putLong(t2._2.offset)    // offset
 +        writeShortString(buffer, t2._2.metadata) // metadata
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes =
 +    2 + /* versionId */
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    shortStringLength(groupId) + 
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
 +      val (topic, offsets) = topicAndOffsets
 +      count +
 +      shortStringLength(topic) + /* topic */
 +      4 + /* number of partitions */
 +      offsets.foldLeft(0)((innerCount, offsetAndMetadata) => {
 +        innerCount +
 +        4 /* partition */ +
 +        8 /* offset */ +
 +        shortStringLength(offsetAndMetadata._2.metadata)
 +      })
 +    })
++
++  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
++    val responseMap = requestInfo.map {
++      case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
++    }.toMap
++    val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
++    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
++  }
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 63c1349,0000000..fe94f17
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@@ -1,89 -1,0 +1,101 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + * 
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package kafka.api
 +
 +import java.nio.ByteBuffer
 +
 +import kafka.api.ApiUtils._
- import kafka.common.TopicAndPartition
 +import kafka.utils.Logging
- 
++import kafka.network.{BoundedByteBufferSend, RequestChannel}
++import kafka.network.RequestChannel.Response
++import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
 +object OffsetFetchRequest extends Logging {
 +  val CurrentVersion: Short = 0
 +  val DefaultClientId = ""
 +
 +  def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
 +    // Read values from the envelope
 +    val versionId = buffer.getShort
 +    val correlationId = buffer.getInt
 +    val clientId = readShortString(buffer)
 +
 +    // Read the OffsetFetchRequest
 +    val consumerGroupId = readShortString(buffer)
 +    val topicCount = buffer.getInt
 +    val pairs = (1 to topicCount).flatMap(_ => {
 +      val topic = readShortString(buffer)
 +      val partitionCount = buffer.getInt
 +      (1 to partitionCount).map(_ => {
 +        val partitionId = buffer.getInt
 +        TopicAndPartition(topic, partitionId)
 +      })
 +    })
 +    OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
 +  }
 +}
 +
 +case class OffsetFetchRequest(groupId: String,
 +                               requestInfo: Seq[TopicAndPartition],
 +                               versionId: Short = OffsetFetchRequest.CurrentVersion,
 +                               correlationId: Int = 0,
 +                               clientId: String = OffsetFetchRequest.DefaultClientId)
 +    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
 +
 +  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
 +  
 +  def writeTo(buffer: ByteBuffer) {
 +    // Write envelope
 +    buffer.putShort(versionId)
 +    buffer.putInt(correlationId)
 +    writeShortString(buffer, clientId)
 +
 +    // Write OffsetFetchRequest
 +    writeShortString(buffer, groupId)             // consumer group
 +    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
 +    requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition])
 +      writeShortString(buffer, t1._1) // topic
 +      buffer.putInt(t1._2.size)       // number of partitions for this topic
 +      t1._2.foreach( t2 => {
 +        buffer.putInt(t2.partition)
 +      })
 +    })
 +  }
 +
 +  override def sizeInBytes =
 +    2 + /* versionId */
 +    4 + /* correlationId */
 +    shortStringLength(clientId) +
 +    shortStringLength(groupId) + 
 +    4 + /* topic count */
 +    requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
 +      count + shortStringLength(t._1) + /* topic */
 +      4 + /* number of partitions */
 +      t._2.size * 4 /* partition */
 +    })
++
++  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
++    val responseMap = requestInfo.map {
++      case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
++        offset=OffsetMetadataAndError.InvalidOffset,
++        error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
++      ))
++    }.toMap
++    val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
++    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
++  }
 +}

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogSegment.scala
index 237cfc4,2e40629..39dd9c2
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@@ -100,12 -86,13 +100,13 @@@ class LogSegment(val log: FileMessageSe
        throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
      if(maxSize == 0)
        return MessageSet.Empty
-       
+     
 -    val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
++    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
      val startPosition = translateOffset(startOffset)
      
 -    // if the start position is already off the end of the log, return MessageSet.Empty
 +    // if the start position is already off the end of the log, return null
      if(startPosition == null)
 -      return MessageSet.Empty
 +      return null
      
      // calculate the length of the message set to read based on whether or not they gave us a maxOffset
      val length = 
@@@ -117,10 -104,10 +118,10 @@@
            // there is a max offset, translate it to a file position and use that to calculate the max read size
            if(offset < startOffset)
              throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
 -          val mapping = translateOffset(offset)
 +          val mapping = translateOffset(offset, startPosition.position)
            val endPosition = 
              if(mapping == null)
-               log.sizeInBytes() // the max offset is off the end of the log, use the end of the file
+               logSize // the max offset is off the end of the log, use the end of the file
              else
                mapping.position
            min(endPosition - startPosition.position, maxSize) 

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/other/kafka/StressTestLog.scala
index 0000000,78e0548..55429af
mode 000000,100644..100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@@ -1,0 -1,109 +1,112 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  * 
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package kafka
+ 
+ import java.util.concurrent.atomic._
+ import kafka.common._
+ import kafka.message._
+ import kafka.log._
+ import kafka.utils._
+ 
+ /**
+  * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it 
+  * from another thread and checks a few basic assertions until the user kills the process.
+  */
+ object StressTestLog {
+   val running = new AtomicBoolean(true)
+   
+   def main(args: Array[String]) {
+     val dir = TestUtils.tempDir()
 -    val log = new Log(dir, 
 -                      maxLogFileSize = 64*1024*1024, 
++    val time = new MockTime
++    val log = new Log(dir = dir,
++                      scheduler = time.scheduler,
++                      maxSegmentSize = 64*1024*1024,
+                       maxMessageSize = Int.MaxValue, 
+                       flushInterval = Int.MaxValue, 
+                       rollIntervalMs = Long.MaxValue, 
+                       needsRecovery = false,
+                       maxIndexSize = 1024*1024,
 -                      time = SystemTime,
 -                      brokerId = 0)
++                      indexIntervalBytes = 4096,
++                      segmentDeleteDelayMs = 60000,
++                      time = time)
+     val writer = new WriterThread(log)
+     writer.start()
+     val reader = new ReaderThread(log)
+     reader.start()
+     
+     Runtime.getRuntime().addShutdownHook(new Thread() {
+       override def run() = {
+         running.set(false)
+         writer.join()
+         reader.join()
+         Utils.rm(dir)
+       }
+     })
+     
+     while(running.get) {
+       println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
+       Thread.sleep(1000)
+     }
+   }
+   
+   abstract class WorkerThread extends Thread {
+     override def run() {
+       try {
+         var offset = 0
+         while(running.get)
+           work()
+       } catch {
+         case e: Exception => 
+           e.printStackTrace()
+           running.set(false)
+       }
+       println(getClass.getName + " exiting...")
+     }
+     def work()
+   }
+   
+   class WriterThread(val log: Log) extends WorkerThread {
+     @volatile var offset = 0
+     override def work() {
 -      val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
 -      require(offsets._1 == offset && offsets._2 == offset)
++      val logAppendInfo = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
++      require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset)
+       offset += 1
+       if(offset % 1000 == 0)
+         Thread.sleep(500)
+     }
+   }
+   
+   class ReaderThread(val log: Log) extends WorkerThread {
+     @volatile var offset = 0
+     override def work() {
+       try {
+         log.read(offset, 1024, Some(offset+1)) match {
+           case read: FileMessageSet if read.sizeInBytes > 0 => {
+             val first = read.head
+             require(first.offset == offset, "We should either read nothing or the message we asked for.")
+             require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes))
+             offset += 1
+           }
+           case _ =>
+         }
+       } catch {
+         case e: OffsetOutOfRangeException => // this is okay
+       }
+     }
+   }
+ }