You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:51 UTC
[5/5] git commit: KAFKA-1012; Consumer offset management in Kafka;
patched by Tejas Patil and Joel Koshy;
feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram
Subramanian, Joe Stein, Chris Riccomini
KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a670537a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a670537a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a670537a
Branch: refs/heads/trunk
Commit: a670537aa33732b15b56644d8ccc1681e16395f5
Parents: 84a3a9a
Author: Joel Koshy <jj...@gmail.com>
Authored: Fri Mar 14 15:14:33 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Mar 14 15:14:33 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 7 +-
.../kafka/api/ConsumerMetadataRequest.scala | 79 +++
.../kafka/api/ConsumerMetadataResponse.scala | 57 +++
.../scala/kafka/api/OffsetCommitRequest.scala | 57 ++-
.../scala/kafka/api/OffsetCommitResponse.scala | 40 +-
.../scala/kafka/api/OffsetFetchRequest.scala | 20 +-
core/src/main/scala/kafka/api/RequestKeys.scala | 4 +-
.../main/scala/kafka/client/ClientUtils.scala | 100 +++-
.../main/scala/kafka/cluster/Partition.scala | 28 +-
...nsumerCoordinatorNotAvailableException.scala | 22 +
.../main/scala/kafka/common/ErrorMapping.scala | 8 +-
.../NotCoordinatorForConsumerException.scala | 22 +
.../kafka/common/OffsetMetadataAndError.scala | 41 +-
.../common/OffsetsLoadInProgressException.scala | 26 +
core/src/main/scala/kafka/common/Topic.scala | 4 +
.../scala/kafka/consumer/ConsoleConsumer.scala | 16 +-
.../scala/kafka/consumer/ConsumerConfig.scala | 39 ++
.../kafka/consumer/ConsumerConnector.scala | 2 +-
.../scala/kafka/consumer/SimpleConsumer.scala | 11 +-
.../main/scala/kafka/consumer/TopicCount.scala | 14 +-
.../main/scala/kafka/consumer/TopicFilter.scala | 11 +-
.../consumer/ZookeeperConsumerConnector.scala | 300 +++++++++---
.../kafka/controller/KafkaController.scala | 4 +-
.../javaapi/ConsumerMetadataResponse.scala | 42 ++
.../kafka/javaapi/OffsetCommitRequest.scala | 13 +-
.../kafka/javaapi/OffsetCommitResponse.scala | 2 +-
.../javaapi/consumer/ConsumerConnector.java | 6 +-
.../consumer/ZookeeperConsumerConnector.scala | 8 +-
.../main/scala/kafka/log/FileMessageSet.scala | 4 +-
.../scala/kafka/producer/ProducerConfig.scala | 11 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 324 +++++++------
.../main/scala/kafka/server/KafkaConfig.scala | 44 +-
.../main/scala/kafka/server/KafkaServer.scala | 30 +-
.../main/scala/kafka/server/OffsetManager.scala | 480 +++++++++++++++++++
.../scala/kafka/server/ReplicaManager.scala | 19 +-
.../kafka/tools/ConsumerOffsetChecker.scala | 93 +++-
.../scala/kafka/tools/DumpLogSegments.scala | 8 +-
.../main/scala/kafka/tools/MirrorMaker.scala | 1 -
.../kafka/tools/ReplicaVerificationTool.scala | 5 +-
.../kafka/tools/VerifyConsumerRebalance.scala | 4 +-
.../kafka/utils/VerifiableProperties.scala | 20 +
core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +-
.../scala/other/kafka/TestOffsetManager.scala | 291 +++++++++++
.../other/kafka/TestZKConsumerOffsets.scala | 73 ---
.../unit/kafka/admin/DeleteTopicTest.scala | 7 -
.../api/RequestResponseSerializationTest.scala | 155 ++----
.../unit/kafka/consumer/TopicFilterTest.scala | 24 +-
.../ZookeeperConsumerConnectorTest.scala | 4 +-
.../unit/kafka/server/OffsetCommitTest.scala | 111 ++---
.../unit/kafka/server/SimpleFetchTest.scala | 9 +-
.../migration_tool_test.py | 2 +-
system_test/mirror_maker/README | 22 -
system_test/mirror_maker/bin/expected.out | 18 -
system_test/mirror_maker/bin/run-test.sh | 357 --------------
.../config/blacklisttest.consumer.properties | 28 --
.../config/mirror_producer.properties | 30 --
.../config/server_source_1_1.properties | 76 ---
.../config/server_source_1_2.properties | 76 ---
.../config/server_source_2_1.properties | 76 ---
.../config/server_source_2_2.properties | 76 ---
.../config/server_target_1_1.properties | 78 ---
.../config/server_target_1_2.properties | 78 ---
.../config/whitelisttest_1.consumer.properties | 28 --
.../config/whitelisttest_2.consumer.properties | 28 --
.../config/zookeeper_source_1.properties | 18 -
.../config/zookeeper_source_2.properties | 18 -
.../config/zookeeper_target.properties | 18 -
.../mirror_maker_testsuite/mirror_maker_test.py | 2 +-
.../cluster_config.json | 103 ++++
.../config/console_consumer.properties | 2 +
.../config/producer_performance.properties | 0
.../config/server.properties | 144 ++++++
.../config/zookeeper.properties | 23 +
.../offset_management_test.py | 298 ++++++++++++
.../testcase_7001/testcase_7001_properties.json | 95 ++++
.../config/kafka_server_1.properties | 148 ++++++
.../config/kafka_server_2.properties | 148 ++++++
.../config/kafka_server_3.properties | 148 ++++++
.../config/kafka_server_4.properties | 148 ++++++
.../testcase_7002/config/zookeeper_0.properties | 24 +
.../testcase_7002/testcase_7002_properties.json | 127 +++++
.../replication_testsuite/replica_basic_test.py | 2 +-
system_test/utils/kafka_system_test_utils.py | 170 ++++++-
system_test/utils/testcase_env.py | 6 +
84 files changed, 3642 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index dc9b092..6fef9df 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.consumer.Whitelist
+import kafka.server.OffsetManager
+
object TopicCommand {
@@ -70,7 +72,7 @@ object TopicCommand {
if (opts.options.has(opts.topicOpt)) {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
- allTopics.filter(topicsFilter.isTopicAllowed)
+ allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics = false))
} else
allTopics
}
@@ -104,6 +106,9 @@ object TopicCommand {
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
+ if (topic == OffsetManager.OffsetsTopicName) {
+ throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
+ }
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
new file mode 100644
index 0000000..dfad6e6
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+import kafka.common.ErrorMapping
+
+object ConsumerMetadataRequest {
+ val CurrentVersion = 0.shortValue
+ val DefaultClientId = ""
+
+ def readFrom(buffer: ByteBuffer) = {
+ // envelope
+ val versionId = buffer.getShort
+ val correlationId = buffer.getInt
+ val clientId = ApiUtils.readShortString(buffer)
+
+ // request
+ val group = ApiUtils.readShortString(buffer)
+ ConsumerMetadataRequest(group, versionId, correlationId, clientId)
+ }
+
+}
+
+case class ConsumerMetadataRequest(group: String,
+ versionId: Short = ConsumerMetadataRequest.CurrentVersion,
+ override val correlationId: Int = 0,
+ clientId: String = ConsumerMetadataRequest.DefaultClientId)
+ extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) {
+
+ def sizeInBytes =
+ 2 + /* versionId */
+ 4 + /* correlationId */
+ ApiUtils.shortStringLength(clientId) +
+ ApiUtils.shortStringLength(group)
+
+ def writeTo(buffer: ByteBuffer) {
+ // envelope
+ buffer.putShort(versionId)
+ buffer.putInt(correlationId)
+ ApiUtils.writeShortString(buffer, clientId)
+
+ // consumer metadata request
+ ApiUtils.writeShortString(buffer, group)
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ // return ConsumerCoordinatorNotAvailable for all uncaught errors
+ val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
+
+ def describe(details: Boolean) = {
+ val consumerMetadataRequest = new StringBuilder
+ consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ consumerMetadataRequest.append("; Version: " + versionId)
+ consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+ consumerMetadataRequest.append("; ClientId: " + clientId)
+ consumerMetadataRequest.append("; Group: " + group)
+ consumerMetadataRequest.toString()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
new file mode 100644
index 0000000..6807f98
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+
+object ConsumerMetadataResponse {
+ val CurrentVersion = 0
+
+ def readFrom(buffer: ByteBuffer) = {
+ val correlationId = buffer.getInt
+ val errorCode = buffer.getShort
+ val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+ Some(Broker.readFrom(buffer))
+ else
+ None
+
+ ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
+ }
+
+}
+
+case class ConsumerMetadataResponse (coordinator: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
+ extends RequestOrResponse(correlationId = correlationId) {
+
+ def sizeInBytes =
+ 4 + /* correlationId */
+ 2 + /* error code */
+ coordinator.map(_.sizeInBytes).getOrElse(0)
+
+ def writeTo(buffer: ByteBuffer) {
+ buffer.putInt(correlationId)
+ buffer.putShort(errorCode)
+ if (errorCode == ErrorMapping.NoError) {
+ coordinator.get.writeTo(buffer)
+ }
+ }
+
+ def describe(details: Boolean) = toString
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 4d1fa5c..9f6956e 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -18,17 +18,20 @@
package kafka.api
import java.nio.ByteBuffer
-
import kafka.api.ApiUtils._
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Logging}
import kafka.network.{RequestChannel, BoundedByteBufferSend}
-import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import kafka.common.{OffsetAndMetadata, ErrorMapping, TopicAndPartition}
import kafka.network.RequestChannel.Response
+import scala.collection._
+
object OffsetCommitRequest extends Logging {
val CurrentVersion: Short = 0
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
+ val now = SystemTime.milliseconds
+
// Read values from the envelope
val versionId = buffer.getShort
val correlationId = buffer.getInt
@@ -43,23 +46,45 @@ object OffsetCommitRequest extends Logging {
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val offset = buffer.getLong
+ val timestamp = {
+ val given = buffer.getLong
+ if (given == -1L) now else given
+ }
val metadata = readShortString(buffer)
- (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
+ (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
})
})
- OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
+ OffsetCommitRequest(consumerGroupId, mutable.Map(pairs:_*), versionId, correlationId, clientId)
}
}
case class OffsetCommitRequest(groupId: String,
- requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+ requestInfo: mutable.Map[TopicAndPartition, OffsetAndMetadata],
versionId: Short = OffsetCommitRequest.CurrentVersion,
override val correlationId: Int = 0,
clientId: String = OffsetCommitRequest.DefaultClientId)
extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) {
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
-
+
+ def filterLargeMetadata(maxMetadataSize: Int) =
+ requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize)
+
+ def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = {
+ val commitStatus = requestInfo.map {info =>
+ (info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize)
+ ErrorMapping.OffsetMetadataTooLargeCode
+ else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
+ ErrorMapping.ConsumerCoordinatorNotAvailableCode
+ else if (errorCode == ErrorMapping.NotLeaderForPartitionCode)
+ ErrorMapping.NotCoordinatorForConsumerCode
+ else
+ errorCode)
+ }.toMap
+ OffsetCommitResponse(commitStatus, correlationId)
+ }
+
+
def writeTo(buffer: ByteBuffer) {
// Write envelope
buffer.putShort(versionId)
@@ -73,9 +98,10 @@ case class OffsetCommitRequest(groupId: String,
writeShortString(buffer, t1._1) // topic
buffer.putInt(t1._2.size) // number of partitions for this topic
t1._2.foreach( t2 => {
- buffer.putInt(t2._1.partition) // partition
- buffer.putLong(t2._2.offset) // offset
- writeShortString(buffer, t2._2.metadata) // metadata
+ buffer.putInt(t2._1.partition)
+ buffer.putLong(t2._2.offset)
+ buffer.putLong(t2._2.timestamp)
+ writeShortString(buffer, t2._2.metadata)
})
})
}
@@ -95,15 +121,14 @@ case class OffsetCommitRequest(groupId: String,
innerCount +
4 /* partition */ +
8 /* offset */ +
+ 8 /* timestamp */ +
shortStringLength(offsetAndMetadata._2.metadata)
})
})
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val responseMap = requestInfo.map {
- case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }.toMap
- val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
+ val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ val errorResponse = responseFor(errorCode, Int.MaxValue)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
@@ -119,7 +144,7 @@ case class OffsetCommitRequest(groupId: String,
offsetCommitRequest.toString()
}
- override def toString(): String = {
- describe(true)
+ override def toString = {
+ describe(details = true)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 9e1795f..4946e97 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -19,9 +19,8 @@ package kafka.api
import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import kafka.common.TopicAndPartition
import kafka.utils.Logging
+import kafka.common.TopicAndPartition
object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 0
@@ -30,7 +29,7 @@ object OffsetCommitResponse extends Logging {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
- val topic = readShortString(buffer)
+ val topic = ApiUtils.readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
@@ -42,37 +41,34 @@ object OffsetCommitResponse extends Logging {
}
}
-case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
+case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
override val correlationId: Int = 0)
extends RequestOrResponse(correlationId=correlationId) {
- lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+ lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
- buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
- requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
- writeShortString(buffer, t1._1) // topic
- buffer.putInt(t1._2.size) // number of partitions for this topic
- t1._2.foreach( t2 => { // TopicAndPartition -> Short
- buffer.putInt(t2._1.partition)
- buffer.putShort(t2._2) //error
- })
- })
+ buffer.putInt(commitStatusGroupedByTopic.size)
+ commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
+ ApiUtils.writeShortString(buffer, topic)
+ buffer.putInt(statusMap.size) // partition count
+ statusMap.foreach { case(topicAndPartition, errorCode) =>
+ buffer.putInt(topicAndPartition.partition)
+ buffer.putShort(errorCode)
+ }
+ }
}
override def sizeInBytes =
4 + /* correlationId */
4 + /* topic count */
- requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
- val (topic, offsets) = topicAndOffsets
+ commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
+ val (topic, partitionStatus) = partitionStatusMap
count +
- shortStringLength(topic) + /* topic */
- 4 + /* number of partitions */
- offsets.size * (
- 4 + /* partition */
- 2 /* error */
- )
+ ApiUtils.shortStringLength(topic) +
+ 4 + /* partition count */
+ partitionStatus.size * ( 4 /* partition */ + 2 /* error code */)
})
override def describe(details: Boolean):String = { toString }
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 7036532..a32f858 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiUtils._
import kafka.utils.Logging
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
-import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
object OffsetFetchRequest extends Logging {
val CurrentVersion: Short = 0
val DefaultClientId = ""
@@ -50,10 +50,10 @@ object OffsetFetchRequest extends Logging {
}
case class OffsetFetchRequest(groupId: String,
- requestInfo: Seq[TopicAndPartition],
- versionId: Short = OffsetFetchRequest.CurrentVersion,
- override val correlationId: Int = 0,
- clientId: String = OffsetFetchRequest.DefaultClientId)
+ requestInfo: Seq[TopicAndPartition],
+ versionId: Short = OffsetFetchRequest.CurrentVersion,
+ override val correlationId: Int = 0,
+ clientId: String = OffsetFetchRequest.DefaultClientId)
extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) {
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
@@ -91,8 +91,8 @@ case class OffsetFetchRequest(groupId: String,
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = requestInfo.map {
case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
- offset=OffsetMetadataAndError.InvalidOffset,
- error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+ offset = OffsetAndMetadata.InvalidOffset,
+ error = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
))
}.toMap
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
@@ -111,7 +111,7 @@ case class OffsetFetchRequest(groupId: String,
offsetFetchRequest.toString()
}
- override def toString(): String = {
- describe(true)
+ override def toString: String = {
+ describe(details = true)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c81214f..fbfc9d3 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -31,6 +31,7 @@ object RequestKeys {
val ControlledShutdownKey: Short = 7
val OffsetCommitKey: Short = 8
val OffsetFetchKey: Short = 9
+ val ConsumerMetadataKey: Short = 10
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -42,7 +43,8 @@ object RequestKeys {
UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
- OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom))
+ OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
+ ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom))
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 1d2f81b..fc9e084 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -20,12 +20,16 @@ import scala.collection._
import kafka.cluster._
import kafka.api._
import kafka.producer._
-import kafka.common.KafkaException
+import kafka.common.{ErrorMapping, KafkaException}
import kafka.utils.{Utils, Logging}
import java.util.Properties
import util.Random
+ import kafka.network.BlockingChannel
+ import kafka.utils.ZkUtils._
+ import org.I0Itec.zkclient.ZkClient
+ import java.io.IOException
-/**
+ /**
* Helper functions common to clients (producer, consumer, or admin)
*/
object ClientUtils extends Logging{
@@ -103,5 +107,93 @@ object ClientUtils extends Logging{
new Broker(brokerId, hostName, port)
})
}
-
-}
\ No newline at end of file
+
+ /**
+ * Creates a blocking channel to a random broker
+ */
+ def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
+ var channel: BlockingChannel = null
+ var connected = false
+ while (!connected) {
+ val allBrokers = getAllBrokersInCluster(zkClient)
+ Random.shuffle(allBrokers).find { broker =>
+ trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
+ try {
+ channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs)
+ channel.connect()
+ debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
+ true
+ } catch {
+ case e: Exception =>
+ if (channel != null) channel.disconnect()
+ channel = null
+ info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
+ false
+ }
+ }
+ connected = if (channel == null) false else true
+ }
+
+ channel
+ }
+
+ /**
+ * Creates a blocking channel to the offset manager of the given group
+ */
+ def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
+ var queryChannel = channelToAnyBroker(zkClient)
+
+ var offsetManagerChannelOpt: Option[BlockingChannel] = None
+
+ while (!offsetManagerChannelOpt.isDefined) {
+
+ var coordinatorOpt: Option[Broker] = None
+
+ while (!coordinatorOpt.isDefined) {
+ try {
+ if (!queryChannel.isConnected)
+ queryChannel = channelToAnyBroker(zkClient)
+ debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
+ queryChannel.send(ConsumerMetadataRequest(group))
+ val response = queryChannel.receive()
+ val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer)
+ debug("Consumer metadata response: " + consumerMetadataResponse.toString)
+ if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
+ coordinatorOpt = consumerMetadataResponse.coordinator
+ }
+ catch {
+ case ioe: IOException =>
+ info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
+ queryChannel.disconnect()
+ }
+ }
+
+ val coordinator = coordinatorOpt.get
+ if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) {
+ offsetManagerChannelOpt = Some(queryChannel)
+ } else {
+ val connectString = "%s:%d".format(coordinator.host, coordinator.port)
+ var offsetManagerChannel: BlockingChannel = null
+ try {
+ debug("Connecting to offset manager %s.".format(connectString))
+ offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ socketTimeoutMs)
+ offsetManagerChannel.connect()
+ offsetManagerChannelOpt = Some(offsetManagerChannel)
+ queryChannel.disconnect()
+ }
+ catch {
+ case ioe: IOException => // offsets manager may have moved
+ info("Error while connecting to %s.".format(connectString))
+ if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
+ Thread.sleep(retryBackOffMs)
+ offsetManagerChannelOpt = None // just in case someone decides to change shutdownChannel to not swallow exceptions
+ }
+ }
+ }
+
+ offsetManagerChannelOpt.get
+ }
+ }
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 882b6da..0b88f14 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils._
import java.lang.Object
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
-import kafka.server.ReplicaManager
+import kafka.server.{OffsetManager, ReplicaManager}
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
@@ -165,7 +165,8 @@ class Partition(val topic: String,
* and setting the new leader and ISR
*/
def makeLeader(controllerId: Int,
- partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
+ partitionStateInfo: PartitionStateInfo, correlationId: Int,
+ offsetManager: OffsetManager): Boolean = {
leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -186,6 +187,8 @@ class Partition(val topic: String,
leaderReplicaIdOpt = Some(localBrokerId)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(getReplica().get)
+ if (topic == OffsetManager.OffsetsTopicName)
+ offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
@@ -196,7 +199,7 @@ class Partition(val topic: String,
*/
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
- correlationId: Int): Boolean = {
+ correlationId: Int, offsetManager: OffsetManager): Boolean = {
leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -212,12 +215,21 @@ class Partition(val topic: String,
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
+
+ leaderReplicaIdOpt.foreach { leaderReplica =>
+ if (topic == OffsetManager.OffsetsTopicName &&
+ /* if we are making a leader->follower transition */
+ leaderReplica == localBrokerId)
+ offsetManager.clearOffsetsInPartition(partitionId)
+ }
- if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
- return false;
-
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
- true
+ if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
+ false
+ }
+ else {
+ leaderReplicaIdOpt = Some(newLeaderBrokerId)
+ true
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala b/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
new file mode 100644
index 0000000..8e02d26
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class ConsumerCoordinatorNotAvailableException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index b0b5dce..5559d26 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -43,6 +43,9 @@ object ErrorMapping {
val StaleControllerEpochCode: Short = 11
val OffsetMetadataTooLargeCode: Short = 12
val StaleLeaderEpochCode: Short = 13
+ val OffsetsLoadInProgressCode: Short = 14
+ val ConsumerCoordinatorNotAvailableCode: Short = 15
+ val NotCoordinatorForConsumerCode: Short = 16
private val exceptionToCode =
Map[Class[Throwable], Short](
@@ -57,7 +60,10 @@ object ErrorMapping {
classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
- classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode
+ classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode,
+ classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
+ classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
+ classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala b/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
new file mode 100644
index 0000000..1eb74be
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class NotCoordinatorForConsumerException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 59608a3..1586243 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,20 +15,41 @@ package kafka.common
* limitations under the License.
*/
-/**
- * Convenience case class since (topic, partition) pairs are ubiquitous.
- */
-case class OffsetMetadataAndError(offset: Long, metadata: String = OffsetMetadataAndError.NoMetadata, error: Short = ErrorMapping.NoError) {
+package kafka.common
- def this(tuple: (Long, String, Short)) = this(tuple._1, tuple._2, tuple._3)
+case class OffsetAndMetadata(offset: Long,
+ metadata: String = OffsetAndMetadata.NoMetadata,
+ timestamp: Long = -1L) {
+ override def toString = "OffsetAndMetadata[%d,%s%s]"
+ .format(offset,
+ if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",
+ if (timestamp == -1) "" else "," + timestamp.toString)
+}
- def asTuple = (offset, metadata, error)
+object OffsetAndMetadata {
+ val InvalidOffset: Long = -1L
+ val NoMetadata: String = ""
+ val InvalidTime: Long = -1L
+}
+
+case class OffsetMetadataAndError(offset: Long,
+ metadata: String = OffsetAndMetadata.NoMetadata,
+ error: Short = ErrorMapping.NoError) {
+
+ def this(offsetMetadata: OffsetAndMetadata, error: Short) =
+ this(offsetMetadata.offset, offsetMetadata.metadata, error)
- override def toString = "OffsetAndMetadata[%d,%s,%d]".format(offset, metadata, error)
+ def this(error: Short) =
+ this(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, error)
+ def asTuple = (offset, metadata, error)
+
+ override def toString = "OffsetMetadataAndError[%d,%s,%d]".format(offset, metadata, error)
}
object OffsetMetadataAndError {
- val InvalidOffset: Long = -1L;
- val NoMetadata: String = "";
+ val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
+ val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
+ val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala b/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
new file mode 100644
index 0000000..1c8e96e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates that offsets are currently being loaded from disk into the cache so offset fetch requests cannot be satisfied.
+ */
+class OffsetsLoadInProgressException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index c1b9f65..ad75978 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,12 +18,16 @@
package kafka.common
import util.matching.Regex
+import kafka.server.OffsetManager
+
object Topic {
val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
+ val InternalTopics = Set(OffsetManager.OffsetsTopicName)
+
def validate(topic: String) {
if (topic.length <= 0)
throw new InvalidTopicException("topic name is illegal, can't be empty")
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index dc066c2..0f62819 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -123,7 +123,13 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("metrics dictory")
.ofType(classOf[java.lang.String])
-
+ val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.")
+ val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).")
+ .withRequiredArg
+ .describedAs("Offsets storage method.")
+ .ofType(classOf[String])
+ .defaultsTo("zookeeper")
+ val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.")
val options: OptionSet = tryParse(parser, args)
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
@@ -153,6 +159,7 @@ object ConsoleConsumer extends Logging {
KafkaMetricsReporter.startReporters(verifiableProps)
}
+ val offsetsStorage = options.valueOf(offsetsStorageOpt)
val props = new Properties()
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
@@ -166,6 +173,13 @@ object ConsoleConsumer extends Logging {
props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
+ props.put("offsets.storage", offsetsStorage)
+ if (options.has(includeInternalTopicsOpt))
+ props.put("exclude.internal.topics", "false")
+ if (options.has(dualCommitEnabledOpt))
+ props.put("dual.commit.enabled", "true")
+ else
+ props.put("dual.commit.enabled", "false")
val config = new ConsumerConfig(props)
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index e6875d6..1cf2f62 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -41,9 +41,14 @@ object ConsumerConfig extends Config {
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
+ val OffsetsChannelBackoffMs = 1000
+ val OffsetsChannelSocketTimeoutMs = 10000
+ val OffsetsCommitMaxRetries = 5
+ val OffsetsStorage = "zookeeper"
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
+ val ExcludeInternalTopics = true
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
@@ -51,6 +56,7 @@ object ConsumerConfig extends Config {
validateClientId(config.clientId)
validateGroupId(config.groupId)
validateAutoOffsetReset(config.autoOffsetReset)
+ validateOffsetsStorage(config.offsetsStorage)
}
def validateClientId(clientId: String) {
@@ -69,6 +75,15 @@ object ConsumerConfig extends Config {
"Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
}
}
+
+ def validateOffsetsStorage(storage: String) {
+ storage match {
+ case "zookeeper" =>
+ case "kafka" =>
+ case _ => throw new InvalidConfigException("Wrong value " + storage + " of offsets.storage in consumer config; " +
+ "Valid values are 'zookeeper' and 'kafka'")
+ }
+ }
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
@@ -122,6 +137,27 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
/** backoff time to refresh the leader of a partition after it loses the current leader */
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
+ /** backoff time to reconnect the offsets channel or to retry offset fetches/commits */
+ val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
+ /** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for
+ * the ConsumerMetdata requests that are used to query for the offset coordinator. */
+ val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
+
+ /** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during
+ * shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query
+ * for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason,
+ * it is retried and that retry does not count toward this limit. */
+ val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
+
+ /** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
+ val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
+
+ /** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
+ * is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
+ * given consumer group, it is safe to turn this off after all instances within that group have been migrated to
+ * the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */
+ val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
+
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
@@ -136,6 +172,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
*/
val clientId = props.getString("client.id", groupId)
+ /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
+ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
+
validate(this)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 13c3f77..07677c1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -70,7 +70,7 @@ trait ConsumerConnector {
/**
* Commit the offsets of all broker partitions connected by this connector.
*/
- def commitOffsets
+ def commitOffsets(retryOnFailure: Boolean = true)
/**
* Shut down the connector
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index fa7caa7..0e64632 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -92,6 +92,11 @@ class SimpleConsumer(val host: String,
TopicMetadataResponse.readFrom(response.buffer)
}
+ def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
+ val response = sendRequest(request)
+ ConsumerMetadataResponse.readFrom(response.buffer)
+ }
+
/**
* Fetch a set of messages from a topic.
*
@@ -126,7 +131,11 @@ class SimpleConsumer(val host: String,
* @param request a [[kafka.api.OffsetCommitRequest]] object.
* @return a [[kafka.api.OffsetCommitResponse]] object.
*/
- def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+ def commitOffsets(request: OffsetCommitRequest) = {
+ // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before
+ // we can commit offsets.
+ OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+ }
/**
* Fetch offsets for a topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index e332633..c793110 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -47,7 +47,7 @@ private[kafka] object TopicCount extends Logging {
val blackListPattern = "black_list"
val staticPattern = "static"
- def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient) : TopicCount = {
+ def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
val dirs = new ZKGroupDirs(group)
val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
var subscriptionPattern: String = null
@@ -85,15 +85,15 @@ private[kafka] object TopicCount extends Logging {
new Whitelist(regex)
else
new Blacklist(regex)
- new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+ new WildcardTopicCount(zkClient, consumerId, filter, numStreams, excludeInternalTopics)
}
}
def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
new StaticTopicCount(consumerIdString, topicCount)
- def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient) =
- new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+ def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient, excludeInternalTopics: Boolean) =
+ new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams, excludeInternalTopics)
}
@@ -119,9 +119,11 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
private[kafka] class WildcardTopicCount(zkClient: ZkClient,
consumerIdString: String,
topicFilter: TopicFilter,
- numStreams: Int) extends TopicCount {
+ numStreams: Int,
+ excludeInternalTopics: Boolean) extends TopicCount {
def getConsumerThreadIdsPerTopic = {
- val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+ val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+ .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 4f20823..5a13540 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -20,6 +20,7 @@ package kafka.consumer
import kafka.utils.Logging
import java.util.regex.{PatternSyntaxException, Pattern}
+import kafka.common.Topic
sealed abstract class TopicFilter(rawRegex: String) extends Logging {
@@ -41,12 +42,12 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
override def toString = regex
- def isTopicAllowed(topic: String): Boolean
+ def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean
}
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
- override def isTopicAllowed(topic: String) = {
- val allowed = topic.matches(regex)
+ override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
+ val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
@@ -58,8 +59,8 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
}
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
- override def isTopicAllowed(topic: String) = {
- val allowed = !topic.matches(regex)
+ override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
+ val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 703b2e2..9a3db90 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -34,7 +34,11 @@ import kafka.utils.Utils.inLock
import kafka.common._
import com.yammer.metrics.core.Gauge
import kafka.metrics._
+import kafka.network.BlockingChannel
+import kafka.client.ClientUtils
+import kafka.api._
import scala.Some
+import kafka.common.TopicAndPartition
/**
@@ -85,7 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
- private var checkpointedOffsets = new Pool[TopicAndPartition, Long]
+ private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
@@ -94,8 +98,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
private var loadBalancerListener: ZKRebalancerListener = null
+ private var offsetsChannel: BlockingChannel = null
+ private val offsetsChannelLock = new Object
+
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+ // useful for tracking migration of consumers to store offsets in kafka
+ private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
+ private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
+
val consumerIdString = {
var consumerUuid : String = null
config.consumerId match {
@@ -113,6 +124,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
connectZk()
createFetcher()
+ ensureOffsetManagerConnected()
+
if (config.autoCommitEnable) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
@@ -156,12 +169,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
}
+ // Blocks until the offset manager is located and a channel is established to it.
+ private def ensureOffsetManagerConnected() {
+ if (config.offsetsStorage == "kafka") {
+ if (offsetsChannel == null || !offsetsChannel.isConnected)
+ offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
+
+ debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
+ }
+ }
+
def shutdown() {
- rebalanceLock synchronized {
- val canShutdown = isShuttingDown.compareAndSet(false, true);
- if (canShutdown) {
- info("ZKConsumerConnector shutting down")
+ val canShutdown = isShuttingDown.compareAndSet(false, true)
+ if (canShutdown) {
+ info("ZKConsumerConnector shutting down")
+ rebalanceLock synchronized {
if (wildcardTopicWatcher != null)
wildcardTopicWatcher.shutdown()
try {
@@ -178,6 +201,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
zkClient.close()
zkClient = null
}
+
+ if (offsetsChannel != null) offsetsChannel.disconnect()
} catch {
case e: Throwable =>
fatal("error during consumer connector shutdown", e)
@@ -240,7 +265,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def autoCommit() {
trace("auto committing")
try {
- commitOffsets()
+ commitOffsets(isAutoCommit = false)
}
catch {
case t: Throwable =>
@@ -249,30 +274,184 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
- def commitOffsets() {
- if (zkClient == null) {
- error("zk client is null. Cannot commit offsets")
- return
+ def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+ updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+ zkCommitMeter.mark()
+ }
+
+ def commitOffsets(isAutoCommit: Boolean = true) {
+ var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit
+ var done = false
+
+ while (!done) {
+ val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors
+ val offsetsToCommit = mutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
+ partitionTopicInfos.filterNot { case (partition, info) =>
+ val newOffset = info.getConsumeOffset()
+ newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))
+ }.map { case (partition, info) =>
+ TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
+ }
+ }.toSeq:_*)
+
+ if (offsetsToCommit.size > 0) {
+ if (config.offsetsStorage == "zookeeper") {
+ offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
+ commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
+ checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset)
+ }
+ true
+ } else {
+ val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId)
+ ensureOffsetManagerConnected()
+ try {
+ kafkaCommitMeter.mark(offsetsToCommit.size)
+ offsetsChannel.send(offsetCommitRequest)
+ val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+ trace("Offset commit response: %s.".format(offsetCommitResponse))
+
+ val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
+ offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) =>
+
+ if (errorCode == ErrorMapping.NoError) {
+ val offset = offsetsToCommit(topicPartition).offset
+ checkpointedOffsets.put(topicPartition, offset)
+ if (config.dualCommitEnabled) {
+ commitOffsetToZooKeeper(topicPartition, offset)
+ }
+ }
+
+ (folded._1 || // update commitFailed
+ errorCode != ErrorMapping.NoError,
+
+ folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
+ (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
+
+ folded._3 || // update shouldRefreshCoordinator
+ errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
+ errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
+
+ // update error count
+ folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
+ }
+ }
+ debug(errorCount + " errors in offset commit response.")
+
+
+ if (shouldRefreshCoordinator) {
+ debug("Could not commit offsets (because offset coordinator has moved or is unavailable).")
+ offsetsChannel.disconnect()
+ }
+
+ if (commitFailed && retryableIfFailed)
+ false
+ else
+ true
+ }
+ catch {
+ case t: Throwable =>
+ error("Error while committing offsets.", t)
+ offsetsChannel.disconnect()
+ false
+ }
+ }
+ } else {
+ debug("No updates to offsets since last commit.")
+ true
+ }
+ }
+
+ done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down
+ retriesRemaining -= 1
+ retriesRemaining == 0 || committed
+ } else
+ true
+
+ if (!done) {
+ debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs))
+ Thread.sleep(config.offsetsChannelBackoffMs)
+ }
+ }
+ }
+
+ private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
+ val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+ val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
+ offsetString match {
+ case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError))
+ case None => (topicPartition, OffsetMetadataAndError.NoOffset)
}
- for ((topic, infos) <- topicRegistry) {
- val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
- for (info <- infos.values) {
- val newOffset = info.getConsumeOffset
- if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) {
+ }
+
+ private def fetchOffsets(partitions: Seq[TopicAndPartition]) = {
+ if (partitions.isEmpty)
+ Some(OffsetFetchResponse(Map.empty))
+ else if (config.offsetsStorage == "zookeeper") {
+ val offsets = partitions.map(fetchOffsetFromZooKeeper)
+ Some(OffsetFetchResponse(immutable.Map(offsets:_*)))
+ } else {
+ val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId)
+
+ var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None
+ while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) {
+ offsetFetchResponseOpt = offsetsChannelLock synchronized {
+ ensureOffsetManagerConnected()
try {
- updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString)
- checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset)
- } catch {
- case t: Throwable =>
- // log it and let it go
- warn("exception during commitOffsets", t)
+ offsetsChannel.send(offsetFetchRequest)
+ val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer)
+ trace("Offset fetch response: %s.".format(offsetFetchResponse))
+
+ val (leaderChanged, loadInProgress) =
+ offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+ (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode),
+ folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode))
+ }
+
+ if (leaderChanged) {
+ offsetsChannel.disconnect()
+ debug("Could not fetch offsets (because offset manager has moved).")
+ None // retry
+ }
+ else if (loadInProgress) {
+ debug("Could not fetch offsets (because offset cache is being loaded).")
+ None // retry
+ }
+ else {
+ if (config.dualCommitEnabled) {
+ // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the
+ // maximum between offsets in zookeeper and kafka.
+ val kafkaOffsets = offsetFetchResponse.requestInfo
+ val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
+ val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
+ val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
+ (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError))
+ }
+ Some(OffsetFetchResponse(mostRecentOffsets))
+ }
+ else
+ Some(offsetFetchResponse)
+ }
+ }
+ catch {
+ case e: Exception =>
+ error("Error while fetching offsets from %s:%d.".format(offsetsChannel.host, offsetsChannel.port), e)
+ offsetsChannel.disconnect()
+ None // retry
}
- debug("Committed offset " + newOffset + " for topic " + info)
+ }
+
+ if (offsetFetchResponseOpt.isEmpty) {
+ debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs))
+ Thread.sleep(config.offsetsChannelBackoffMs)
}
}
+
+ offsetFetchResponseOpt
}
}
+
class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
@@ -433,8 +612,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def rebalance(cluster: Cluster): Boolean = {
- val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
- val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
+ val myTopicThreadIdsMap = TopicCount.constructTopicCount(
+ group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
+ val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics)
val brokers = getAllBrokersInCluster(zkClient)
if (brokers.size == 0) {
// This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
@@ -458,13 +638,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
releasePartitionOwnership(topicRegistry)
- var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
+ var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]()
val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
- val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
@@ -490,27 +669,42 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
- addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
- partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+ partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
}
}
}
}
- /**
- * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
- * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
- */
- if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
- info("Updating the cache")
- debug("Partitions per topic cache " + partitionsPerTopicMap)
- debug("Consumers per topic cache " + consumersPerTopicMap)
- topicRegistry = currentTopicRegistry
- updateFetcher(cluster)
- true
- } else {
+ // fetch current offsets for all topic-partitions
+ val topicPartitions = partitionOwnershipDecision.keySet.toSeq
+ val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
+
+ if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
false
+ else {
+ val offsetFetchResponse = offsetFetchResponseOpt.get
+ topicPartitions.foreach { topicAndPartition =>
+ val (topic, partition) = topicAndPartition.asTuple
+ val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
+ val threadId = partitionOwnershipDecision(topicAndPartition)
+ addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
+ }
+
+ /**
+ * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+ * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+ */
+ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+ info("Updating the cache")
+ debug("Partitions per topic cache " + partitionsPerTopicMap)
+ debug("Consumers per topic cache " + consumersPerTopicMap)
+ topicRegistry = currentTopicRegistry
+ updateFetcher(cluster)
+ true
+ } else {
+ false
+ }
}
}
}
@@ -533,7 +727,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
* successfully and the fetchers restart to fetch more data chunks
**/
if (config.autoCommitEnable)
- commitOffsets
+ commitOffsets()
case None =>
}
}
@@ -578,11 +772,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
}
- private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = {
+ private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = {
var successfullyOwnedPartitions : List[(String, Int)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
- val topic = partitionOwner._1._1
- val partition = partitionOwner._1._2
+ val topic = partitionOwner._1.topic
+ val partition = partitionOwner._1.partition
val consumerThreadId = partitionOwner._2
val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
try {
@@ -609,18 +803,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
- topicDirs: ZKGroupTopicDirs, partition: Int,
- topic: String, consumerThreadId: String) {
+ partition: Int, topic: String,
+ offset: Long, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
- val znode = topicDirs.consumerOffsetDir + "/" + partition
- val offsetString = readDataMaybeNull(zkClient, znode)._1
- // If first time starting a consumer, set the initial offset to -1
- val offset =
- offsetString match {
- case Some(offsetStr) => offsetStr.toLong
- case None => PartitionTopicInfo.InvalidOffset
- }
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
@@ -746,10 +932,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// bootstrap with existing topics
private var wildcardTopics =
getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
- .filter(topicFilter.isTopicAllowed)
+ .filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
private val wildcardTopicCount = TopicCount.constructTopicCount(
- consumerIdString, topicFilter, numStreams, zkClient)
+ consumerIdString, topicFilter, numStreams, zkClient, config.excludeInternalTopics)
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
@@ -764,7 +950,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def handleTopicEvent(allTopics: Seq[String]) {
debug("Handling topic event")
- val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+ val updatedTopics = allTopics.filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
val addedTopics = updatedTopics filterNot (wildcardTopics contains)
if (addedTopics.nonEmpty)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4deff9d..5db24a7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -604,7 +604,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
- def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
+ def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -1116,7 +1116,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
- onPreferredReplicaElection(Set(topicPartition), false)
+ onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
new file mode 100644
index 0000000..dfa9c42
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.cluster.Broker
+
+class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
+
+ def errorCode = underlying.errorCode
+
+ def coordinator: Broker = {
+ import kafka.javaapi.Implicits._
+ underlying.coordinator
+ }
+
+ override def equals(other: Any) = canEqual(other) && {
+ val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
+ this.underlying.equals(otherConsumerMetadataResponse.underlying)
+ }
+
+ def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
+
+ override def hashCode = underlying.hashCode
+
+ override def toString = underlying.toString
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 57b9d2a..6de320d 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -17,19 +17,18 @@
package kafka.javaapi
-import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
-import collection.JavaConversions
-import java.nio.ByteBuffer
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
class OffsetCommitRequest(groupId: String,
- requestInfo: java.util.Map[TopicAndPartition, OffsetMetadataAndError],
+ requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
versionId: Short,
correlationId: Int,
clientId: String) {
val underlying = {
- val scalaMap: Map[TopicAndPartition, OffsetMetadataAndError] = {
- import JavaConversions._
- requestInfo.toMap
+ val scalaMap: collection.mutable.Map[TopicAndPartition, OffsetAndMetadata] = {
+ import collection.JavaConversions._
+
+ collection.JavaConversions.asMap(requestInfo)
}
kafka.api.OffsetCommitRequest(
groupId = groupId,
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index 570bf31..c2d3d11 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -24,7 +24,7 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons
def errors: java.util.Map[TopicAndPartition, Short] = {
import JavaConversions._
- underlying.requestInfo
+ underlying.commitStatus
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index c45c803..44d3d35 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -18,13 +18,12 @@
package kafka.javaapi.consumer;
+import java.util.List;
+import java.util.Map;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.serializer.Decoder;
-import java.util.List;
-import java.util.Map;
-
public interface ConsumerConnector {
/**
* Create a list of MessageStreams of type T for each topic.
@@ -62,6 +61,7 @@ public interface ConsumerConnector {
* Commit the offsets of all broker partitions connected by this connector.
*/
public void commitOffsets();
+ public void commitOffsets(boolean retryOnFailure);
/**
* Shut down the connector
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 58e83f6..1f95d9b 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -101,9 +101,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def createMessageStreamsByFilter(topicFilter: TopicFilter) =
createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
-
+
def commitOffsets() {
- underlying.commitOffsets
+ underlying.commitOffsets()
+ }
+
+ def commitOffsets(retryOnFailure: Boolean) {
+ underlying.commitOffsets(retryOnFailure)
}
def shutdown() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index e1f8b97..b2652dd 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -255,8 +255,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
/**
* Read from the underlying file into the buffer starting at the given position
*/
- def readInto(buffer: ByteBuffer, position: Int): ByteBuffer = {
- channel.read(buffer, position)
+ def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
+ channel.read(buffer, relativePosition + this.start)
buffer.flip()
buffer
}