You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/04 23:30:59 UTC
git commit: KAFKA-657 Add APIs for the consumer to commit and fetch
offsets on the broker.
Updated Branches:
refs/heads/trunk a39c34a2d -> 222c0e46a
KAFKA-657 Add APIs for the consumer to commit and fetch offsets on the broker.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/222c0e46
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/222c0e46
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/222c0e46
Branch: refs/heads/trunk
Commit: 222c0e46ab78a12e1f58e3fb526667a4f73d344a
Parents: a39c34a
Author: David Arthur <mu...@gmail.com>
Authored: Fri Jan 4 14:30:03 2013 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Fri Jan 4 14:30:03 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/api/OffsetCommitRequest.scala | 100 +++++++++
.../scala/kafka/api/OffsetCommitResponse.scala | 93 ++++++++
.../main/scala/kafka/api/OffsetFetchRequest.scala | 89 ++++++++
.../main/scala/kafka/api/OffsetFetchResponse.scala | 100 +++++++++
core/src/main/scala/kafka/api/RequestKeys.scala | 6 +-
.../src/main/scala/kafka/common/ErrorMapping.scala | 4 +-
.../kafka/common/OffsetMetadataAndError.scala | 36 +++
.../common/OffsetMetadataTooLargeException.scala | 27 +++
.../main/scala/kafka/consumer/SimpleConsumer.scala | 14 ++
.../main/scala/kafka/javaapi/FetchRequest.scala | 4 -
core/src/main/scala/kafka/javaapi/Implicits.scala | 6 +
.../scala/kafka/javaapi/OffsetCommitRequest.scala | 55 +++++
.../scala/kafka/javaapi/OffsetCommitResponse.scala | 30 +++
.../scala/kafka/javaapi/OffsetFetchRequest.scala | 58 +++++
.../scala/kafka/javaapi/OffsetFetchResponse.scala | 30 +++
.../main/scala/kafka/javaapi/OffsetRequest.scala | 5 -
.../kafka/javaapi/consumer/SimpleConsumer.scala | 21 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 88 +++++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../api/RequestResponseSerializationTest.scala | 65 ++++++-
.../unit/kafka/integration/TopicMetadataTest.scala | 4 +-
.../scala/unit/kafka/server/OffsetCommitTest.scala | 174 +++++++++++++++
.../scala/unit/kafka/server/SimpleFetchTest.scala | 4 +-
24 files changed, 998 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
new file mode 100644
index 0000000..0a34d5d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import kafka.utils.Logging
+
+object OffsetCommitRequest extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetRequest
+ val consumerGroupId = readShortString(buffer)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val offset = buffer.getLong
+ val metadata = readShortString(buffer)
+ (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
+ })
+ })
+ OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetCommitRequest(groupId: String,
+ requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+ versionId: Short = OffsetCommitRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetCommitRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetCommitRequest
+ writeShortString(buffer, groupId) // consumer group
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => {
+ buffer.putInt(t2._1.partition) // partition
+ buffer.putLong(t2._2.offset) // offset
+ writeShortString(buffer, t2._2.metadata) // metadata
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ shortStringLength(groupId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+ val (topic, offsets) = topicAndOffsets
+ count +
+ shortStringLength(topic) + /* topic */
+ 4 + /* number of partitions */
+ offsets.foldLeft(0)((innerCount, offsetAndMetadata) => {
+ innerCount +
+ 4 /* partition */ +
+ 8 /* offset */ +
+ shortStringLength(offsetAndMetadata._2.metadata)
+ })
+ })
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
new file mode 100644
index 0000000..4e1313e
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+
+object OffsetCommitResponse extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetResponse
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val error = buffer.getShort
+ (TopicAndPartition(topic, partitionId), error)
+ })
+ })
+ OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
+ versionId: Short = OffsetCommitResponse.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetCommitResponse.DefaultClientId)
+ extends RequestOrResponse {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetCommitResponse
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => { // TopicAndPartition -> Short
+ buffer.putInt(t2._1.partition)
+ buffer.putShort(t2._2) //error
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+ val (topic, offsets) = topicAndOffsets
+ count +
+ shortStringLength(topic) + /* topic */
+ 4 + /* number of partitions */
+ offsets.size * (
+ 4 + /* partition */
+ 2 /* error */
+ )
+ })
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
new file mode 100644
index 0000000..63c1349
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+
+object OffsetFetchRequest extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetFetchRequest
+ val consumerGroupId = readShortString(buffer)
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ TopicAndPartition(topic, partitionId)
+ })
+ })
+ OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetFetchRequest(groupId: String,
+ requestInfo: Seq[TopicAndPartition],
+ versionId: Short = OffsetFetchRequest.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetFetchRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetFetchRequest
+ writeShortString(buffer, groupId) // consumer group
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition])
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => {
+ buffer.putInt(t2.partition)
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ shortStringLength(groupId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
+ count + shortStringLength(t._1) + /* topic */
+ 4 + /* number of partitions */
+ t._2.size * 4 /* partition */
+ })
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
new file mode 100644
index 0000000..fb5e6cb
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import kafka.utils.Logging
+
+object OffsetFetchResponse extends Logging {
+ val CurrentVersion: Short = 0
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
+ // Read values from the envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = readShortString(buffer)
+
+ // Read the OffsetResponse
+ val topicCount = buffer.getInt
+ val pairs = (1 to topicCount).flatMap(_ => {
+ val topic = readShortString(buffer)
+ val partitionCount = buffer.getInt
+ (1 to partitionCount).map(_ => {
+ val partitionId = buffer.getInt
+ val offset = buffer.getLong
+ val metadata = readShortString(buffer)
+ val error = buffer.getShort
+ (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
+ })
+ })
+ OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ }
+}
+
+case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+ versionId: Short = OffsetFetchResponse.CurrentVersion,
+ correlationId: Int = 0,
+ clientId: String = OffsetFetchResponse.DefaultClientId)
+ extends RequestOrResponse {
+
+ lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+ def writeTo(buffer: ByteBuffer) {
+ // Write envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ writeShortString(buffer, clientId)
+
+ // Write OffsetFetchResponse
+ buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+ requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
+ writeShortString(buffer, t1._1) // topic
+ buffer.putInt(t1._2.size) // number of partitions for this topic
+ t1._2.foreach( t2 => { // TopicAndPartition -> OffsetMetadataAndError
+ buffer.putInt(t2._1.partition)
+ buffer.putLong(t2._2.offset)
+ writeShortString(buffer, t2._2.metadata)
+ buffer.putShort(t2._2.error)
+ })
+ })
+ }
+
+ override def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ shortStringLength(clientId) +
+ 4 + /* topic count */
+ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+ val (topic, offsets) = topicAndOffsets
+ count +
+ shortStringLength(topic) + /* topic */
+ 4 + /* number of partitions */
+ offsets.foldLeft(0)((innerCount, offsetsAndMetadata) => {
+ innerCount +
+ 4 /* partition */ +
+ 8 /* offset */ +
+ shortStringLength(offsetsAndMetadata._2.metadata) +
+ 2 /* error */
+ })
+ })
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index b000eb7..89ce92a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -27,6 +27,8 @@ object RequestKeys {
val MetadataKey: Short = 3
val LeaderAndIsrKey: Short = 4
val StopReplicaKey: Short = 5
+ val OffsetCommitKey: Short = 6
+ val OffsetFetchKey: Short = 7
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -34,7 +36,9 @@ object RequestKeys {
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
- StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
+ StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
+ OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
+ OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom))
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c8769e0..153bc0b 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -41,6 +41,7 @@ object ErrorMapping {
val ReplicaNotAvailableCode: Short = 9
val MessageSizeTooLargeCode: Short = 10
val StaleControllerEpochCode: Short = 11
+ val OffsetMetadataTooLargeCode: Short = 12
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -54,7 +55,8 @@ object ErrorMapping {
classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
- classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
+ classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
+ classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
new file mode 100644
index 0000000..59608a3
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -0,0 +1,36 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (topic, partition) pairs are ubiquitous.
+ */
+case class OffsetMetadataAndError(offset: Long, metadata: String = OffsetMetadataAndError.NoMetadata, error: Short = ErrorMapping.NoError) {
+
+ def this(tuple: (Long, String, Short)) = this(tuple._1, tuple._2, tuple._3)
+
+ def asTuple = (offset, metadata, error)
+
+ override def toString = "OffsetAndMetadata[%d,%s,%d]".format(offset, metadata, error)
+
+}
+
+object OffsetMetadataAndError {
+ val InvalidOffset: Long = -1L;
+ val NoMetadata: String = "";
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala b/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
new file mode 100644
index 0000000..50edb27
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has specified offset metadata that exceeds the configured
+ * maximum size in bytes
+ */
+class OffsetMetadataTooLargeException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6b83deb..cd8ef0b 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -175,6 +175,20 @@ class SimpleConsumer(val host: String,
*/
def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
+ /**
+ * Commit offsets for a topic
+ * @param request a [[kafka.api.OffsetCommitRequest]] object.
+ * @return a [[kafka.api.OffsetCommitResponse]] object.
+ */
+ def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+
+ /**
+ * Fetch offsets for a topic
+ * @param request a [[kafka.api.OffsetFetchRequest]] object.
+ * @return a [[kafka.api.OffsetFetchResponse]] object.
+ */
+ def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer)
+
private def getOrMakeConnection() {
if(!blockingChannel.isConnected) {
connect()
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 44d148e..942427a 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -42,10 +42,6 @@ class FetchRequest(correlationId: Int,
)
}
- def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
-
- def sizeInBytes = underlying.sizeInBytes
-
override def toString = underlying.toString
override def equals(other: Any) = canEqual(other) && {
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index cf82b38..66ab821 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -34,6 +34,12 @@ private[javaapi] object Implicits extends Logging {
implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse =
new kafka.javaapi.OffsetResponse(response)
+ implicit def toJavaOffsetFetchResponse(response: kafka.api.OffsetFetchResponse): kafka.javaapi.OffsetFetchResponse =
+ new kafka.javaapi.OffsetFetchResponse(response)
+
+ implicit def toJavaOffsetCommitResponse(response: kafka.api.OffsetCommitResponse): kafka.javaapi.OffsetCommitResponse =
+ new kafka.javaapi.OffsetCommitResponse(response)
+
implicit def optionToJavaRef[T](opt: Option[T]): T = {
opt match {
case Some(obj) => obj
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
new file mode 100644
index 0000000..32033d6
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+class OffsetCommitRequest(groupId: String,
+ requestInfo: java.util.Map[TopicAndPartition, OffsetMetadataAndError],
+ versionId: Short,
+ correlationId: Int,
+ clientId: String) {
+ val underlying = {
+ val scalaMap = JavaConversions.asMap(requestInfo).toMap
+ kafka.api.OffsetCommitRequest(
+ groupId = groupId,
+ requestInfo = scalaMap,
+ versionId = versionId,
+ correlationId = correlationId,
+ clientId = clientId
+ )
+ }
+
+
+ override def toString = underlying.toString
+
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetCommitRequest]
+ this.underlying.equals(otherOffsetRequest.underlying)
+ }
+
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetCommitRequest]
+
+
+ override def hashCode = underlying.hashCode
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
new file mode 100644
index 0000000..d1c50c4
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import collection.JavaConversions
+
+class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
+
+ def errors: java.util.Map[TopicAndPartition, Short] = {
+ JavaConversions.asMap(underlying.requestInfo)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
new file mode 100644
index 0000000..64d134b
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+class OffsetFetchRequest(groupId: String,
+ requestInfo: java.util.List[TopicAndPartition],
+ versionId: Short,
+ correlationId: Int,
+ clientId: String) {
+
+ val underlying = {
+ val scalaSeq = JavaConversions.asBuffer(requestInfo)
+ kafka.api.OffsetFetchRequest(
+ groupId = groupId,
+ requestInfo = scalaSeq,
+ versionId = versionId,
+ correlationId = correlationId,
+ clientId = clientId
+ )
+ }
+
+
+ override def toString = underlying.toString
+
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetFetchRequest]
+ this.underlying.equals(otherOffsetRequest.underlying)
+ }
+
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetFetchRequest]
+
+
+ override def hashCode = underlying.hashCode
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
new file mode 100644
index 0000000..9f83c1b
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import collection.JavaConversions
+
+class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) {
+
+ def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = {
+ JavaConversions.asMap(underlying.requestInfo)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 1c77ff8..3565a15 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -38,11 +38,6 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
}
- def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
-
-
- def sizeInBytes = underlying.sizeInBytes
-
override def toString = underlying.toString
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index 58c7081..0ab0195 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -21,7 +21,6 @@ import kafka.utils.threadsafe
import kafka.javaapi.FetchResponse
import kafka.javaapi.OffsetRequest
-
/**
* A consumer of kafka messages
*/
@@ -80,6 +79,26 @@ class SimpleConsumer(val host: String,
underlying.getOffsetsBefore(request.underlying)
}
+ /**
+ * Commit offsets for a topic
+ * @param request a [[kafka.javaapi.OffsetCommitRequest]] object.
+ * @return a [[kafka.javaapi.OffsetCommitResponse]] object.
+ */
+ def commitOffsets(request: kafka.javaapi.OffsetCommitRequest): kafka.javaapi.OffsetCommitResponse = {
+ import kafka.javaapi.Implicits._
+ underlying.commitOffsets(request.underlying)
+ }
+
+ /**
+ * Fetch offsets for a topic
+ * @param request a [[kafka.javaapi.OffsetFetchRequest]] object.
+ * @return a [[kafka.javaapi.OffsetFetchResponse]] object.
+ */
+ def fetchOffsets(request: kafka.javaapi.OffsetFetchRequest): kafka.javaapi.OffsetFetchResponse = {
+ import kafka.javaapi.Implicits._
+ underlying.fetchOffsets(request.underlying)
+ }
+
def close() {
underlying.close
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ef3b66e..4283973 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,7 +22,7 @@ import kafka.api._
import kafka.message._
import kafka.network._
import kafka.log._
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs}
import org.apache.log4j.Logger
import scala.collection._
import kafka.network.RequestChannel.Response
@@ -39,7 +39,8 @@ import kafka.common._
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
- brokerId: Int) extends Logging {
+ val brokerId: Int,
+ val config: KafkaConfig) extends Logging {
private val producerRequestPurgatory =
new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
@@ -62,6 +63,8 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+ case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
+ case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -118,6 +121,25 @@ class KafkaApis(val requestChannel: RequestChannel,
error("error when handling request %s".format(apiRequest), e)
val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ case RequestKeys.OffsetCommitKey =>
+ val apiRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ val responseMap = apiRequest.requestInfo.map {
+ case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }.toMap
+ error("error when handling request %s".format(apiRequest), e)
+ val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ case RequestKeys.OffsetFetchKey =>
+ val apiRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+ val responseMap = apiRequest.requestInfo.map {
+ case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
+ offset=OffsetMetadataAndError.InvalidOffset,
+ error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ ))
+ }.toMap
+ error("error when handling request %s".format(apiRequest), e)
+ val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
@@ -525,6 +547,68 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
+ /*
+ * Service the Offset commit API
+ */
+ def handleOffsetCommitRequest(request: RequestChannel.Request) {
+ val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
+ trace("Handling offset commit request " + offsetCommitRequest.toString)
+ val responseInfo = offsetCommitRequest.requestInfo.map( t => {
+ val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic)
+ try {
+ if(t._2.metadata.length > config.offsetMetadataMaxSize) {
+ (t._1, ErrorMapping.OffsetMetadataTooLargeCode)
+ } else {
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+ t._1.partition, t._2.offset.toString)
+ (t._1, ErrorMapping.NoError)
+ }
+ } catch {
+ case e =>
+ (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ })
+ val response = new OffsetCommitResponse(responseInfo,
+ offsetCommitRequest.versionId,
+ offsetCommitRequest.correlationId,
+ offsetCommitRequest.clientId)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ }
+
+ /*
+ * Service the Offset fetch API
+ */
+ def handleOffsetFetchRequest(request: RequestChannel.Request) {
+ val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
+ trace("Handling offset fetch request " + offsetFetchRequest.toString)
+ val responseInfo = offsetFetchRequest.requestInfo.map( t => {
+ val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
+ try {
+ val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
+ payloadOpt match {
+ case Some(payload) => {
+ (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
+ }
+ case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
+ ErrorMapping.UnknownTopicOrPartitionCode))
+ }
+ } catch {
+ case e =>
+ (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
+ ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+ }
+ })
+ val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*),
+ offsetFetchRequest.versionId,
+ offsetFetchRequest.correlationId,
+ offsetFetchRequest.clientId)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ }
+
def close() {
debug("Shutting down.")
fetchRequestPurgatory.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19c797c..264a411 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -173,4 +173,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the purge interval (in number of requests) of the producer request purgatory */
val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
+ /*********** Misc configuration ***********/
+ val offsetMetadataMaxSize = props.getInt("offset.metadata.max.size", 1024)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 50ece6f..97416d7 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -80,7 +80,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+ apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
Mx4jLoader.maybeLoad
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 509b020..c4b2986 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
import collection.mutable._
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
import kafka.controller.LeaderIsrAndControllerEpoch
@@ -144,6 +144,36 @@ object SerializationTestUtils{
def createTestTopicMetadataResponse: TopicMetadataResponse = {
new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
}
+
+ def createTestOffsetCommitRequest: OffsetCommitRequest = {
+ new OffsetCommitRequest("group 1", collection.immutable.Map(
+ TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="some metadata"),
+ TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetMetadataAndError.NoMetadata)
+ ))
+ }
+
+ def createTestOffsetCommitResponse: OffsetCommitResponse = {
+ new OffsetCommitResponse(collection.immutable.Map(
+ TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
+ TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode
+ ))
+ }
+
+ def createTestOffsetFetchRequest: OffsetFetchRequest = {
+ new OffsetFetchRequest("group 1", Seq(
+ TopicAndPartition(topic1, 0),
+ TopicAndPartition(topic1, 1)
+ ))
+ }
+
+ def createTestOffsetFetchResponse: OffsetFetchResponse = {
+ new OffsetFetchResponse(collection.immutable.Map(
+ TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
+ TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadataAndError.NoMetadata,
+ ErrorMapping.UnknownTopicOrPartitionCode)
+ ))
+ }
+
}
class RequestResponseSerializationTest extends JUnitSuite {
@@ -158,6 +188,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest
private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
+ private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest
+ private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
+ private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
+ private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
@Test
@@ -238,5 +272,34 @@ class RequestResponseSerializationTest extends JUnitSuite {
val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
deserializedTopicMetadataResponse)
+
+ buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes)
+ offsetCommitRequest.writeTo(buffer)
+ buffer.rewind()
+ val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer)
+ assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest,
+ deserializedOffsetCommitRequest)
+
+ buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes)
+ offsetCommitResponse.writeTo(buffer)
+ buffer.rewind()
+ val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer)
+ assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse,
+ deserializedOffsetCommitResponse)
+
+ buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes)
+ offsetFetchRequest.writeTo(buffer)
+ buffer.rewind()
+ val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer)
+ assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest,
+ deserializedOffsetFetchRequest)
+
+ buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes)
+ offsetFetchResponse.writeTo(buffer)
+ buffer.rewind()
+ val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer)
+ assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse,
+ deserializedOffsetFetchResponse)
+
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 54a5a06..230119b 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -107,7 +107,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1, configs.head)
// call the API (to be tested) to get metadata
apis.handleTopicMetadataRequest(new RequestChannel.Request
@@ -119,4 +119,4 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
topicMetadata
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
new file mode 100644
index 0000000..48d5647
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import kafka.utils._
+import junit.framework.Assert._
+import java.util.Properties
+import kafka.consumer.SimpleConsumer
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
+import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
+import kafka.utils.TestUtils._
+import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import scala.util.Random
+
+class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val random: Random = new Random()
+ var logDir: File = null
+ var topicLogDir: File = null
+ var server: KafkaServer = null
+ var logSize: Int = 100
+ val brokerPort: Int = 9099
+ var simpleConsumer: SimpleConsumer = null
+ var time: Time = new MockTime()
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ val config: Properties = createBrokerConfig(1, brokerPort)
+ val logDirPath = config.getProperty("log.dir")
+ logDir = new File(logDirPath)
+ time = new MockTime()
+ server = TestUtils.createServer(new KafkaConfig(config), time)
+ simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
+ }
+
+ @After
+ override def tearDown() {
+ simpleConsumer.close
+ server.shutdown
+ Utils.rm(logDir)
+ super.tearDown()
+ }
+
+ @Test
+ def testUpdateOffsets() {
+ val topic = "topic"
+
+ // Commit an offset
+ val topicAndPartition = TopicAndPartition(topic, 0)
+ val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(offset=42L)))
+ val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+
+ // Fetch it and verify
+ val fetchRequest = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+ val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
+
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
+ //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
+ assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
+
+ // Commit a new offset
+ val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ offset=100L,
+ metadata="some metadata"
+ )))
+ val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
+
+ assertEquals(ErrorMapping.NoError, commitResponse1.requestInfo.get(topicAndPartition).get)
+
+ // Fetch it and verify
+ val fetchRequest1 = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+ val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
+
+ assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
+ //assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
+ assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
+
+ }
+
+ @Test
+ def testCommitAndFetchOffsets() {
+ val topic1 = "topic-1"
+ val topic2 = "topic-2"
+ val topic3 = "topic-3"
+ val topic4 = "topic-4"
+
+ val commitRequest = OffsetCommitRequest("test-group", Map(
+ TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="metadata one"),
+ TopicAndPartition(topic2, 0) -> OffsetMetadataAndError(offset=43L, metadata="metadata two"),
+ TopicAndPartition(topic3, 0) -> OffsetMetadataAndError(offset=44L, metadata="metadata three"),
+ TopicAndPartition(topic2, 1) -> OffsetMetadataAndError(offset=45L)
+ ))
+ val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get)
+
+ val fetchRequest = OffsetFetchRequest("test-group", Seq(
+ TopicAndPartition(topic1, 0),
+ TopicAndPartition(topic2, 0),
+ TopicAndPartition(topic3, 0),
+ TopicAndPartition(topic2, 1),
+ TopicAndPartition(topic3, 1), // An unknown partition
+ TopicAndPartition(topic4, 0) // An unknown topic
+ ))
+ val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
+
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error)
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+
+ //assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
+ //assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
+ //assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
+ //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
+ //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
+ //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+
+ assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
+ assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
+ assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
+ assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
+ assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+ assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+ }
+
+ @Test
+ def testLargeMetadataPayload() {
+ val topicAndPartition = TopicAndPartition("large-metadata", 0)
+ val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ offset=42L,
+ metadata=random.nextString(server.config.offsetMetadataMaxSize)
+ )))
+ val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+ assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+
+ val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ offset=42L,
+ metadata=random.nextString(server.config.offsetMetadataMaxSize + 1)
+ )))
+ val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
+
+ assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.requestInfo.get(topicAndPartition).get)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 624d852..f3454f6 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -89,7 +89,7 @@ class SimpleFetchTest extends JUnit3Suite {
// start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
// don't provide replica or leader callbacks since they will not be tested here
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head)
// This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
val goodFetch = new FetchRequestBuilder()
@@ -156,7 +156,7 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.replay(replicaManager)
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+ val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head)
/**
* This fetch, coming from a replica, requests all data at offset "15". Because the request is coming