You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[28/28] git commit: merge 0.8 to trunk and resolve conflicts
Updated Branches:
refs/heads/trunk 999813821 -> 92f177b30
merge 0.8 to trunk and resolve conflicts
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92f177b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92f177b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92f177b3
Branch: refs/heads/trunk
Commit: 92f177b309834a9b572bbab33e8f6198316bd156
Parents: 9998138 40a80fa
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 25 22:03:39 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 25 22:03:39 2013 -0800
----------------------------------------------------------------------
.gitignore | 2 +
README.md | 31 ++--
bin/kafka-run-class.sh | 41 +++--
bin/run-rat.sh | 2 +-
contrib/hadoop-consumer/build.sbt | 1 +
contrib/hadoop-producer/build.sbt | 1 +
core/build.sbt | 27 +++
core/src/main/scala/kafka/api/FetchRequest.scala | 26 +++-
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 26 +++-
.../main/scala/kafka/api/OffsetCommitRequest.scala | 13 +-
.../main/scala/kafka/api/OffsetFetchRequest.scala | 16 ++-
core/src/main/scala/kafka/api/OffsetRequest.scala | 24 +++-
.../src/main/scala/kafka/api/ProducerRequest.scala | 24 +++-
.../main/scala/kafka/api/RequestOrResponse.scala | 8 +-
.../main/scala/kafka/api/StopReplicaRequest.scala | 27 +++-
.../scala/kafka/api/TopicMetadataRequest.scala | 23 +++-
core/src/main/scala/kafka/client/ClientUtils.scala | 3 +-
core/src/main/scala/kafka/cluster/Partition.scala | 4 +-
.../scala/kafka/consumer/ConsoleConsumer.scala | 6 +
.../kafka/consumer/ConsumerFetcherManager.scala | 5 +-
.../consumer/ZookeeperConsumerConnector.scala | 5 +-
.../scala/kafka/controller/KafkaController.scala | 61 +++---
.../kafka/controller/PartitionLeaderSelector.scala | 47 +++---
.../kafka/controller/PartitionStateMachine.scala | 26 ++--
.../kafka/controller/ReplicaStateMachine.scala | 57 +++---
core/src/main/scala/kafka/log/LogSegment.scala | 5 +-
.../main/scala/kafka/network/RequestChannel.scala | 14 +-
.../main/scala/kafka/network/SocketServer.scala | 5 +-
.../kafka/producer/async/DefaultEventHandler.scala | 7 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 24 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 100 +----------
.../scala/kafka/server/ReplicaFetcherManager.scala | 2 +-
.../scala/kafka/server/ReplicaFetcherThread.scala | 2 +-
.../main/scala/kafka/tools/KafkaMigrationTool.java | 2 +-
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
core/src/main/scala/kafka/utils/Logging.scala | 2 +-
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 11 +-
.../src/test/scala/other/kafka/StressTestLog.scala | 112 +++++++++++
.../test/scala/unit/kafka/utils/UtilsTest.scala | 12 ++
examples/build.sbt | 3 +
lib/apache-rat-0.8.jar | Bin 0 -> 1165578 bytes
lib/sbt-launch.jar | Bin 920993 -> 1103618 bytes
perf/build.sbt | 1 +
project/Build.scala | 143 +++++++++++++++
project/build.properties | 9 +-
project/plugins.sbt | 5 +
project/plugins/Plugins.scala | 23 ---
.../0.7/bin/kafka-run-class.sh | 15 ++
.../testcase_0011/testcase_0011_properties.json | 84 +++++++++
.../testcase_0024/testcase_0024_properties.json | 86 +++++++++
.../testcase_0119/testcase_0119_properties.json | 87 +++++++++
.../testcase_0128/testcase_0128_properties.json | 89 +++++++++
.../testcase_0134/testcase_0134_properties.json | 92 +++++++++
.../testcase_0159/testcase_0159_properties.json | 91 +++++++++
.../testcase_0209/testcase_0209_properties.json | 90 +++++++++
.../testcase_0259/testcase_0259_properties.json | 90 +++++++++
.../testcase_0309/testcase_0309_properties.json | 88 +++++++++
58 files changed, 1490 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 0a34d5d,0000000..1ca37e2
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@@ -1,100 -1,0 +1,109 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
- import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import kafka.utils.Logging
-
++import kafka.network.{RequestChannel, BoundedByteBufferSend}
++import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
++import kafka.network.RequestChannel.Response
+object OffsetCommitRequest extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetRequest
+ val consumerGroupId = readShortString(buffer)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val offset = buffer.getLong
+ val metadata = readShortString(buffer)
+ (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
+ })
+ })
+ OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetCommitRequest(groupId: String,
+ requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+ versionId: Short = OffsetCommitRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetCommitRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetCommitRequest
+ writeShortString(buffer, groupId) // consumer group
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => {
+ buffer.putInt(t2._1.partition) // partition
+ buffer.putLong(t2._2.offset) // offset
+ writeShortString(buffer, t2._2.metadata) // metadata
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ shortStringLength(groupId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+ val (topic, offsets) = topicAndOffsets
+ count +
+ shortStringLength(topic) + /* topic */
+ 4 + /* number of partitions */
+ offsets.foldLeft(0)((innerCount, offsetAndMetadata) => {
+ innerCount +
+ 4 /* partition */ +
+ 8 /* offset */ +
+ shortStringLength(offsetAndMetadata._2.metadata)
+ })
+ })
++
++ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
++ val responseMap = requestInfo.map {
++ case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
++ }.toMap
++ val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
++ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
++ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 63c1349,0000000..fe94f17
mode 100644,000000..100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@@ -1,89 -1,0 +1,101 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
- import kafka.common.TopicAndPartition
+import kafka.utils.Logging
-
++import kafka.network.{BoundedByteBufferSend, RequestChannel}
++import kafka.network.RequestChannel.Response
++import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+object OffsetFetchRequest extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetFetchRequest
+ val consumerGroupId = readShortString(buffer)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ TopicAndPartition(topic, partitionId)
+ })
+ })
+ OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetFetchRequest(groupId: String,
+ requestInfo: Seq[TopicAndPartition],
+ versionId: Short = OffsetFetchRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetFetchRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetFetchRequest
+ writeShortString(buffer, groupId) // consumer group
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition])
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => {
+ buffer.putInt(t2.partition)
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ shortStringLength(groupId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
+ count + shortStringLength(t._1) + /* topic */
+ 4 + /* number of partitions */
+ t._2.size * 4 /* partition */
+ })
++
++ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
++ val responseMap = requestInfo.map {
++ case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
++ offset=OffsetMetadataAndError.InvalidOffset,
++ error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
++ ))
++ }.toMap
++ val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
++ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
++ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/log/LogSegment.scala
index 237cfc4,2e40629..39dd9c2
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@@ -100,12 -86,13 +100,13 @@@ class LogSegment(val log: FileMessageSe
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
if(maxSize == 0)
return MessageSet.Empty
-
+
- val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy
++ val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
- // if the start position is already off the end of the log, return MessageSet.Empty
+ // if the start position is already off the end of the log, return null
if(startPosition == null)
- return MessageSet.Empty
+ return null
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
@@@ -117,10 -104,10 +118,10 @@@
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
- val mapping = translateOffset(offset)
+ val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
- log.sizeInBytes() // the max offset is off the end of the log, use the end of the file
+ logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/92f177b3/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/other/kafka/StressTestLog.scala
index 0000000,78e0548..55429af
mode 000000,100644..100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@@ -1,0 -1,109 +1,112 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package kafka
+
+ import java.util.concurrent.atomic._
+ import kafka.common._
+ import kafka.message._
+ import kafka.log._
+ import kafka.utils._
+
+ /**
+ * A stress test that instantiates a log and then runs continual appends against it from one thread and continual reads against it
+ * from another thread and checks a few basic assertions until the user kills the process.
+ */
+ object StressTestLog {
+ val running = new AtomicBoolean(true)
+
+ def main(args: Array[String]) {
+ val dir = TestUtils.tempDir()
- val log = new Log(dir,
- maxLogFileSize = 64*1024*1024,
++ val time = new MockTime
++ val log = new Log(dir = dir,
++ scheduler = time.scheduler,
++ maxSegmentSize = 64*1024*1024,
+ maxMessageSize = Int.MaxValue,
+ flushInterval = Int.MaxValue,
+ rollIntervalMs = Long.MaxValue,
+ needsRecovery = false,
+ maxIndexSize = 1024*1024,
- time = SystemTime,
- brokerId = 0)
++ indexIntervalBytes = 4096,
++ segmentDeleteDelayMs = 60000,
++ time = time)
+ val writer = new WriterThread(log)
+ writer.start()
+ val reader = new ReaderThread(log)
+ reader.start()
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run() = {
+ running.set(false)
+ writer.join()
+ reader.join()
+ Utils.rm(dir)
+ }
+ })
+
+ while(running.get) {
+ println("Reader offset = %d, writer offset = %d".format(reader.offset, writer.offset))
+ Thread.sleep(1000)
+ }
+ }
+
+ abstract class WorkerThread extends Thread {
+ override def run() {
+ try {
+ var offset = 0
+ while(running.get)
+ work()
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ running.set(false)
+ }
+ println(getClass.getName + " exiting...")
+ }
+ def work()
+ }
+
+ class WriterThread(val log: Log) extends WorkerThread {
+ @volatile var offset = 0
+ override def work() {
- val offsets = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
- require(offsets._1 == offset && offsets._2 == offset)
++ val logAppendInfo = log.append(TestUtils.singleMessageSet(offset.toString.getBytes))
++ require(logAppendInfo.firstOffset == offset && logAppendInfo.lastOffset == offset)
+ offset += 1
+ if(offset % 1000 == 0)
+ Thread.sleep(500)
+ }
+ }
+
+ class ReaderThread(val log: Log) extends WorkerThread {
+ @volatile var offset = 0
+ override def work() {
+ try {
+ log.read(offset, 1024, Some(offset+1)) match {
+ case read: FileMessageSet if read.sizeInBytes > 0 => {
+ val first = read.head
+ require(first.offset == offset, "We should either read nothing or the message we asked for.")
+ require(MessageSet.entrySize(first.message) == read.sizeInBytes, "Expected %d but got %d.".format(MessageSet.entrySize(first.message), read.sizeInBytes))
+ offset += 1
+ }
+ case _ =>
+ }
+ } catch {
+ case e: OffsetOutOfRangeException => // this is okay
+ }
+ }
+ }
+ }