You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/08 03:33:55 UTC
kafka git commit: KAFKA-2068; KAFKA-2069; Replace OffsetCommit and OffsetFetch Request/…
Repository: kafka
Updated Branches:
refs/heads/trunk 2fed0c62d -> 8d0c298c8
KAFKA-2068; KAFKA-2069; Replace OffsetCommit and OffsetFetch Request/…
…Response with o.a.k.c.requests equivalent
Author: Grant Henke <gr...@gmail.com>
Reviewers: Ismael Juma
Closes #927 from granthenke/offset-refactor
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d0c298c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d0c298c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d0c298c
Branch: refs/heads/trunk
Commit: 8d0c298c8c3283a7f8cffc4f68b3af87b0588e07
Parents: 2fed0c6
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Mar 7 18:33:52 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Mar 7 18:33:52 2016 -0800
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 18 +-
.../coordinator/GroupMetadataManager.scala | 33 +--
.../scala/kafka/network/RequestChannel.scala | 4 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 276 ++++++++++---------
.../scala/kafka/tools/DumpLogSegments.scala | 14 +-
.../GroupCoordinatorResponseTest.scala | 12 +-
6 files changed, 181 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index cb08358..36d7bbb 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -24,9 +24,10 @@ import kafka.log.LogConfig
import kafka.message.UncompressedCodec
import kafka.server._
import kafka.utils._
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
import scala.collection.{Map, Seq, immutable}
@@ -381,8 +382,8 @@ class GroupCoordinator(val brokerId: Int,
def handleCommitOffsets(groupId: String,
memberId: String,
generationId: Int,
- offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
var delayedOffsetStore: Option[DelayedStore] = None
if (!isActive.get) {
@@ -425,13 +426,16 @@ class GroupCoordinator(val brokerId: Int,
}
def handleFetchOffsets(groupId: String,
- partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+ partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (!isActive.get) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap
+ partitions.map { case topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+ partitions.map { case topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
} else if (isCoordinatorLoadingInProgress(groupId)) {
- partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupLoading)}.toMap
+ partitions.map { case topicPartition =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
} else {
// return offsets blindly regardless the current group state since the group may be using
// Kafka commit storage without automatic group management
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2c29172..cbdb854 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.types.Type.INT32
import org.apache.kafka.common.protocol.types.Type.INT64
import org.apache.kafka.common.protocol.types.Type.BYTES
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.OffsetFetchResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.utils.Time
import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -243,10 +244,10 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreOffsets(groupId: String,
consumerId: String,
generationId: Int,
- offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {
+ offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
// first filter out partitions with offset metadata size exceeding limit
- val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
+ val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
@@ -319,26 +320,26 @@ class GroupMetadataManager(val brokerId: Int,
* The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
* returns the current offset or it begins to sync the cache from the log (and returns an error code).
*/
- def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+ def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
trace("Getting offsets %s for group %s.".format(topicPartitions, group))
if (isGroupLocal(group)) {
if (topicPartitions.isEmpty) {
// Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
- (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+ (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
}.toMap
} else {
- topicPartitions.map { topicAndPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+ topicPartitions.map { topicPartition =>
+ val groupTopicPartition = GroupTopicPartition(group, topicPartition)
(groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
}.toMap
}
} else {
debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
- topicPartitions.map { topicAndPartition =>
- val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
- (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
+ topicPartitions.map { topicPartition =>
+ val groupTopicPartition = GroupTopicPartition(group, topicPartition)
+ (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))
}.toMap
}
}
@@ -517,12 +518,12 @@ class GroupMetadataManager(val brokerId: Int,
* @param key The requested group-topic-partition
* @return If the key is present, return the offset and metadata; otherwise return None
*/
- private def getOffset(key: GroupTopicPartition) = {
+ private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = {
val offsetAndMetadata = offsetsCache.get(key)
if (offsetAndMetadata == null)
- OffsetMetadataAndError.NoOffset
+ new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)
else
- OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
+ new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
}
/**
@@ -872,7 +873,7 @@ object GroupMetadataManager {
val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
- OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition)))
+ OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
} else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
// version 2 refers to offset
@@ -1009,10 +1010,10 @@ object GroupMetadataManager {
}
-case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
+case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
def this(group: String, topic: String, partition: Int) =
- this(group, new TopicAndPartition(topic, partition))
+ this(group, new TopicPartition(topic, partition))
override def toString =
"[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 219e2fb..916c438 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -64,9 +64,7 @@ object RequestChannel extends Logging {
private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
- ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
- ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
- ApiKeys.OFFSET_FETCH.id -> OffsetFetchRequest.readFrom
+ ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
)
// TODO: this will be removed once we migrated to client-side format
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2a289b4..8f3a2ad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,7 +39,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse}
+StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
+OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Node}
@@ -209,107 +210,108 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset commit request
*/
def handleOffsetCommitRequest(request: RequestChannel.Request) {
- val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ val header = request.header
+ val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
- // reject the request immediately if not authorized to the group
+ // reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
- val errors = offsetCommitRequest.requestInfo.mapValues(_ => Errors.GROUP_AUTHORIZATION_FAILED.code)
- val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
- return
- }
-
- // filter non-exist topics
- val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
- !metadataCache.contains(topicAndPartition.topic)
- }
- val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys)
+ val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
+ val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
+ (topicPartition, errorCode)
+ }.toMap
+ val responseHeader = new ResponseHeader(header.correlationId)
+ val responseBody = new OffsetCommitResponse(results.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+ } else {
+ // filter non-existent topics
+ val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
+ !metadataCache.contains(topicPartition.topic)
+ }
+ val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
- val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
- case (topicAndPartition, offsetMetadata) =>
- authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
- }
+ val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
+ case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+ }
- // the callback for sending an offset commit response
- def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
- val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ // the callback for sending an offset commit response
+ def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
+ val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
- mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
- if (errorCode != Errors.NONE.code) {
- debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
- .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
- topicAndPartition, Errors.forCode(errorCode).exceptionName))
+ mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
+ if (errorCode != Errors.NONE.code) {
+ debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+ s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+ }
}
+ val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+
+ val responseHeader = new ResponseHeader(header.correlationId)
+ val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
- val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
- requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
- }
- if (authorizedRequestInfo.isEmpty)
- sendResponseCallback(Map.empty)
- else if (offsetCommitRequest.versionId == 0) {
- // for version 0 always store offsets to ZK
- val responseInfo = authorizedRequestInfo.map {
- case (topicAndPartition, metaAndError) => {
- val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
- try {
- if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
- (topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
- (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
- } else {
- zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
- topicAndPartition.partition, metaAndError.offset.toString)
- (topicAndPartition, Errors.NONE.code)
+ if (authorizedRequestInfo.isEmpty)
+ sendResponseCallback(Map.empty)
+ else if (header.apiVersion == 0) {
+ // for version 0 always store offsets to ZK
+ val responseInfo = authorizedRequestInfo.map {
+ case (topicPartition, partitionData) =>
+ val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
+ try {
+ if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).size <= 0)
+ (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+ (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+ else {
+ zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
+ (topicPartition, Errors.NONE.code)
+ }
+ } catch {
+ case e: Throwable => (topicPartition, Errors.forException(e).code)
}
- } catch {
- case e: Throwable => (topicAndPartition, Errors.forException(e).code)
- }
}
- }
-
- sendResponseCallback(responseInfo)
- } else {
- // for version 1 and beyond store offsets in offset manager
-
- // compute the retention time based on the request version:
- // if it is v1 or not specified by user, we can use the default retention
- val offsetRetention =
- if (offsetCommitRequest.versionId <= 1 ||
- offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
- coordinator.offsetConfig.offsetsRetentionMs
- } else {
- offsetCommitRequest.retentionMs
+ sendResponseCallback(responseInfo)
+ } else {
+ // for version 1 and beyond store offsets in offset manager
+
+ // compute the retention time based on the request version:
+ // if it is v1 or not specified by user, we can use the default retention
+ val offsetRetention =
+ if (header.apiVersion <= 1 ||
+ offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
+ coordinator.offsetConfig.offsetsRetentionMs
+ else
+ offsetCommitRequest.retentionTime
+
+ // commit timestamp is always set to now.
+ // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+ // expire timestamp is computed differently for v1 and v2.
+ // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
+ // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
+ // - If v2 we use the default expiration timestamp
+ val currentTimestamp = SystemTime.milliseconds
+ val defaultExpireTimestamp = offsetRetention + currentTimestamp
+ val partitionData = authorizedRequestInfo.mapValues { partitionData =>
+ new OffsetAndMetadata(
+ offsetMetadata = OffsetMetadata(partitionData.offset, partitionData.metadata),
+ commitTimestamp = currentTimestamp,
+ expireTimestamp = {
+ if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
+ defaultExpireTimestamp
+ else
+ offsetRetention + partitionData.timestamp
+ }
+ )
}
- // commit timestamp is always set to now.
- // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
- // expire timestamp is computed differently for v1 and v2.
- // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
- // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
- // - If v2 we use the default expiration timestamp
- val currentTimestamp = SystemTime.milliseconds
- val defaultExpireTimestamp = offsetRetention + currentTimestamp
- val offsetData = authorizedRequestInfo.mapValues(offsetAndMetadata =>
- offsetAndMetadata.copy(
- commitTimestamp = currentTimestamp,
- expireTimestamp = {
- if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
- defaultExpireTimestamp
- else
- offsetRetention + offsetAndMetadata.commitTimestamp
- }
- )
- )
-
- // call coordinator to handle commit offset
- coordinator.handleCommitOffsets(
- offsetCommitRequest.groupId,
- offsetCommitRequest.memberId,
- offsetCommitRequest.groupGenerationId,
- offsetData,
- sendResponseCallback)
+ // call coordinator to handle commit offset
+ coordinator.handleCommitOffsets(
+ offsetCommitRequest.groupId,
+ offsetCommitRequest.memberId,
+ offsetCommitRequest.generationId,
+ partitionData,
+ sendResponseCallback)
+ }
}
}
@@ -699,61 +701,61 @@ class KafkaApis(val requestChannel: RequestChannel,
/*
* Handle an offset fetch request
*/
-
def handleOffsetFetchRequest(request: RequestChannel.Request) {
- val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+ val header = request.header
+ val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
- // reject the request immediately if not authorized to the group
+ val responseHeader = new ResponseHeader(header.correlationId)
+ val offsetFetchResponse =
+ // reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
- val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_AUTHORIZATION_FAILED.code)
- val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
- return
- }
-
- val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
- authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
- }
-
- val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.TOPIC_AUTHORIZATION_FAILED.code)
- val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
-
- val response = if (offsetFetchRequest.versionId == 0) {
- // version 0 reads offsets from ZK
- val responseInfo = authorizedTopicPartitions.map( topicAndPartition => {
- val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
- try {
- if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
- (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
- } else {
- val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
- payloadOpt match {
- case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong))
- case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
+ val unauthorizedGroupResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_AUTHORIZATION_FAILED.code)
+ val results = offsetFetchRequest.partitions.asScala.map { topicPartition => (topicPartition, unauthorizedGroupResponse)}.toMap
+ new OffsetFetchResponse(results.asJava)
+ } else {
+ val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
+ authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+ }
+ val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
+ val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+
+ if (header.apiVersion == 0) {
+ // version 0 reads offsets from ZK
+ val responseInfo = authorizedTopicPartitions.map { topicPartition =>
+ val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+ try {
+ if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).isEmpty)
+ (topicPartition, unknownTopicPartitionResponse)
+ else {
+ val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+ payloadOpt match {
+ case Some(payload) =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
+ case None =>
+ (topicPartition, unknownTopicPartitionResponse)
+ }
}
+ } catch {
+ case e: Throwable =>
+ (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "",
+ Errors.forException(e).code))
}
- } catch {
- case e: Throwable =>
- (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,
- Errors.forException(e).code))
- }
- })
-
- OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId)
- } else {
- // version 1 reads offsets from Kafka;
- val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
+ }.toMap
+ new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava)
+ } else {
+ // version 1 reads offsets from Kafka;
+ val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
- // Note that we do not need to filter the partitions in the
- // metadata cache as the topic partitions will be filtered
- // in coordinator's offset manager through the offset cache
- OffsetFetchResponse(offsets ++ unauthorizedStatus, offsetFetchRequest.correlationId)
+ // Note that we do not need to filter the partitions in the
+ // metadata cache as the topic partitions will be filtered
+ // in coordinator's offset manager through the offset cache
+ new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava)
+ }
}
- trace("Sending offset fetch response %s for correlation id %d to client %s."
- .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
-
- requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
+ trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+ requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, offsetFetchResponse)))
}
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index afba1ad..e882a30 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -65,7 +65,7 @@ object DumpLogSegments {
CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
val options = parser.parse(args : _*)
-
+
CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
val print = if(options.has(printOpt)) true else false
@@ -75,7 +75,7 @@ object DumpLogSegments {
val files = options.valueOf(filesOpt).split(",")
val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
val isDeepIteration = if(options.has(deepIterationOpt)) true else false
-
+
val messageParser = if (options.has(offsetsOpt)) {
new OffsetsMessageParser
} else {
@@ -114,7 +114,7 @@ object DumpLogSegments {
}
}
}
-
+
/* print out the contents of the index */
private def dumpIndex(file: File,
indexSanityOnly: Boolean,
@@ -181,10 +181,10 @@ object DumpLogSegments {
private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer) = {
val group = offsetKey.key.group
- val (topic, partition) = offsetKey.key.topicPartition.asTuple
+ val topicPartition = offsetKey.key.topicPartition
val offset = GroupMetadataManager.readOffsetMessageValue(payload)
- val keyString = s"offset::${group}:${topic}:${partition}"
+ val keyString = s"offset::${group}:${topicPartition.topic}:${topicPartition.partition}"
val valueString = if (offset.metadata.isEmpty)
String.valueOf(offset.offset)
else
@@ -306,5 +306,5 @@ object DumpLogSegments {
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 90e2b95..50fa09e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -48,8 +48,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
type SyncGroupCallback = (Array[Byte], Short) => Unit
type HeartbeatCallbackParams = Short
type HeartbeatCallback = Short => Unit
- type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
- type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+ type CommitOffsetCallbackParams = Map[TopicPartition, Short]
+ type CommitOffsetCallback = Map[TopicPartition, Short] => Unit
type LeaveGroupCallbackParams = Short
type LeaveGroupCallback = Short => Unit
@@ -574,7 +574,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
@Test
def testCommitOffsetFromUnknownGroup() {
val generationId = 1
- val tp = new TopicAndPartition("topic", 0)
+ val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
@@ -583,7 +583,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
@Test
def testCommitOffsetWithDefaultGeneration() {
- val tp = new TopicAndPartition("topic", 0)
+ val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
@@ -594,7 +594,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
@Test
def testCommitOffsetInAwaitingSync() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val tp = new TopicAndPartition("topic", 0)
+ val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
@@ -900,7 +900,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
private def commitOffsets(groupId: String,
consumerId: String,
generationId: Int,
- offsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+ offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()