You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/13 07:32:44 UTC
kafka git commit: KAFKA-1841;
OffsetCommitRequest API - timestamp field is not versioned; patched by Jun Rao;
reviewed by Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/0.8.2 9b6744d3a -> 432d397af
KAFKA-1841; OffsetCommitRequest API - timestamp field is not versioned; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/432d397a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/432d397a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/432d397a
Branch: refs/heads/0.8.2
Commit: 432d397af8a1d4467fe8041bcff8790864010a80
Parents: 9b6744d
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 12 22:32:31 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 12 22:32:31 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/common/protocol/Protocol.java | 36 +++++++-
.../common/requests/OffsetCommitRequest.java | 22 ++++-
.../scala/kafka/api/OffsetCommitRequest.scala | 21 +++--
.../scala/kafka/api/OffsetCommitResponse.scala | 5 +-
.../scala/kafka/api/OffsetFetchRequest.scala | 4 +-
.../kafka/common/OffsetMetadataAndError.scala | 2 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 96 ++++++++++++++++----
.../api/RequestResponseSerializationTest.scala | 4 +-
8 files changed, 151 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..f0a262e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -111,6 +111,16 @@ public class Protocol {
new Field("offset",
INT64,
"Message offset to be committed."),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Message offset to be committed."),
new Field("timestamp",
INT64,
"Timestamp of the commit"),
@@ -125,6 +135,13 @@ public class Protocol {
new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
"Partitions to commit offsets."));
+ public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
+ STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
+ "Partitions to commit offsets."));
+
public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
STRING,
"The consumer group id."),
@@ -142,7 +159,7 @@ public class Protocol {
STRING,
"The consumer id assigned by the group coordinator."),
new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
"Topics to commit offsets."));
public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
@@ -158,9 +175,11 @@ public class Protocol {
public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
/* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
- public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+ public static Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+
+ public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
+ public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1};
/* Offset fetch api */
public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -181,6 +200,10 @@ public class Protocol {
new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
"Topics to fetch offsets."));
+ // version 0 and 1 have exactly the same wire format, but different functionality.
+ // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
+ public static Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+
public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
@@ -200,8 +223,11 @@ public class Protocol {
public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
- public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
- public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+ /* The response types for both V0 and V1 of OFFSET_FETCH_RESPONSE are the same. */
+ public static Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+ public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1 };
+ public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1 };
/* List offset api */
public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 3ee5cba..66c0772 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -47,6 +47,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_CONSUMER_ID = "";
+ public static final long DEFAULT_TIMESTAMP = -1L;
private final String groupId;
private final int generationId;
@@ -58,6 +59,11 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
public final long timestamp;
public final String metadata;
+ // for v0
+ public PartitionData(long offset, String metadata) {
+ this(offset, DEFAULT_TIMESTAMP, metadata);
+ }
+
public PartitionData(long offset, long timestamp, String metadata) {
this.offset = offset;
this.timestamp = timestamp;
@@ -73,7 +79,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
@Deprecated
public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
- initCommonFields(groupId, offsetData);
+ initCommonFields(groupId, offsetData, 0);
this.groupId = groupId;
this.generationId = DEFAULT_GENERATION_ID;
this.consumerId = DEFAULT_CONSUMER_ID;
@@ -90,7 +96,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
super(new Struct(curSchema));
- initCommonFields(groupId, offsetData);
+ initCommonFields(groupId, offsetData, 1);
struct.set(GENERATION_ID_KEY_NAME, generationId);
struct.set(CONSUMER_ID_KEY_NAME, consumerId);
this.groupId = groupId;
@@ -99,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
this.offsetData = offsetData;
}
- private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+ private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData, int versionId) {
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
struct.set(GROUP_ID_KEY_NAME, groupId);
@@ -113,7 +119,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
- partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+ if (versionId == 1)
+ partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
partitionArray.add(partitionData);
}
@@ -133,7 +140,12 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
- long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+ long timestamp;
+ // timestamp only exists in v1
+ if (partitionResponse.hasField(TIMESTAMP_KEY_NAME))
+ timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+ else
+ timestamp = DEFAULT_TIMESTAMP;
String metadata = partitionResponse.getString(METADATA_KEY_NAME);
PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
offsetData.put(new TopicPartition(topic, partition), partitionData);
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 861a6cf..39607c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -30,8 +30,6 @@ object OffsetCommitRequest extends Logging {
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
- val now = SystemTime.milliseconds
-
// Read values from the envelope
val versionId = buffer.getShort
assert(versionId == 0 || versionId == 1,
@@ -59,8 +57,11 @@ object OffsetCommitRequest extends Logging {
val partitionId = buffer.getInt
val offset = buffer.getLong
val timestamp = {
- val given = buffer.getLong
- if (given == -1L) now else given
+ if (versionId == 1) {
+ val given = buffer.getLong
+ given
+ } else
+ OffsetAndMetadata.InvalidTime
}
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
@@ -68,6 +69,13 @@ object OffsetCommitRequest extends Logging {
})
OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId)
}
+
+ def changeInvalidTimeToCurrentTime(offsetCommitRequest: OffsetCommitRequest) {
+ val now = SystemTime.milliseconds
+ for ( (topicAndPartiiton, offsetAndMetadata) <- offsetCommitRequest.requestInfo)
+ if (offsetAndMetadata.timestamp == OffsetAndMetadata.InvalidTime)
+ offsetAndMetadata.timestamp = now
+ }
}
case class OffsetCommitRequest(groupId: String,
@@ -121,7 +129,8 @@ case class OffsetCommitRequest(groupId: String,
t1._2.foreach( t2 => {
buffer.putInt(t2._1.partition)
buffer.putLong(t2._2.offset)
- buffer.putLong(t2._2.timestamp)
+ if (versionId == 1)
+ buffer.putLong(t2._2.timestamp)
writeShortString(buffer, t2._2.metadata)
})
})
@@ -143,7 +152,7 @@ case class OffsetCommitRequest(groupId: String,
innerCount +
4 /* partition */ +
8 /* offset */ +
- 8 /* timestamp */ +
+ (if (versionId == 1) 8 else 0 ) /* timestamp */ +
shortStringLength(offsetAndMetadata._2.metadata)
})
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 624a1c1..03dd736 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
import kafka.common.TopicAndPartition
object OffsetCommitResponse extends Logging {
- val CurrentVersion: Short = 0
+ val CurrentVersion: Short = 1
def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
val correlationId = buffer.getInt
@@ -41,6 +41,9 @@ object OffsetCommitResponse extends Logging {
}
}
+/**
+ * Single constructor for both version 0 and 1 since they have the same format.
+ */
case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
correlationId: Int = 0)
extends RequestOrResponse() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index c7604b9..74ee829 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -25,7 +25,9 @@ import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
object OffsetFetchRequest extends Logging {
- val CurrentVersion: Short = 0
+ // version 0 and 1 have exactly the same wire format, but different functionality.
+ // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
+ val CurrentVersion: Short = 1
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 4cabffe..db7157d 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -19,7 +19,7 @@ package kafka.common
case class OffsetAndMetadata(offset: Long,
metadata: String = OffsetAndMetadata.NoMetadata,
- timestamp: Long = -1L) {
+ var timestamp: Long = -1L) {
override def toString = "OffsetAndMetadata[%d,%s%s]"
.format(offset,
if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9a61fcb..d626b17 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -25,7 +25,7 @@ import kafka.network._
import kafka.admin.AdminUtils
import kafka.network.RequestChannel.Response
import kafka.controller.KafkaController
-import kafka.utils.{SystemTime, Logging}
+import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
import scala.collection._
@@ -64,7 +64,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
- case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request)
+ case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
@@ -77,6 +77,40 @@ class KafkaApis(val requestChannel: RequestChannel,
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
+ def handleOffsetCommitRequest(request: RequestChannel.Request) {
+ val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ if (offsetCommitRequest.versionId == 0) {
+ // version 0 stores the offsets in ZK
+ val responseInfo = offsetCommitRequest.requestInfo.map{
+ case (topicAndPartition, metaAndError) => {
+ val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+ try {
+ ensureTopicExists(topicAndPartition.topic)
+ if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
+ (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+ } else {
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+ topicAndPartition.partition, metaAndError.offset.toString)
+ (topicAndPartition, ErrorMapping.NoError)
+ }
+ } catch {
+ case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ }
+ }
+ val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ } else {
+ // version 1 and above store the offsets in a special Kafka topic
+ handleProducerOrOffsetCommitRequest(request)
+ }
+ }
+
+ private def ensureTopicExists(topic: String) = {
+ if (metadataCache.getTopicMetadata(Set(topic)).size <= 0)
+ throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
+ }
+
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
@@ -154,6 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (produceRequest, offsetCommitRequestOpt) =
if (request.requestId == RequestKeys.OffsetCommitKey) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+ OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
(producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
} else {
(request.requestObj.asInstanceOf[ProducerRequest], None)
@@ -504,22 +539,47 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
- val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
- metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
- )
- val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
- val knownStatus =
- if (knownTopicPartitions.size > 0)
- offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
- else
- Map.empty[TopicAndPartition, OffsetMetadataAndError]
- val status = unknownStatus ++ knownStatus
-
- val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
-
- trace("Sending offset fetch response %s for correlation id %d to client %s."
- .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
- requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ if (offsetFetchRequest.versionId == 0) {
+ // version 0 reads offsets from ZK
+ val responseInfo = offsetFetchRequest.requestInfo.map( t => {
+ val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
+ try {
+ ensureTopicExists(t.topic)
+ val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
+ payloadOpt match {
+ case Some(payload) => {
+ (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
+ }
+ case None => (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
+ ErrorMapping.UnknownTopicOrPartitionCode))
+ }
+ } catch {
+ case e: Throwable =>
+ (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
+ ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+ }
+ })
+ val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ } else {
+ // version 1 reads offsets from Kafka
+ val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
+ metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
+ )
+ val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
+ val knownStatus =
+ if (knownTopicPartitions.size > 0)
+ offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
+ else
+ Map.empty[TopicAndPartition, OffsetMetadataAndError]
+ val status = unknownStatus ++ knownStatus
+
+ val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+
+ trace("Sending offset fetch response %s for correlation id %d to client %s."
+ .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+ }
}
/*
http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..4e817a2 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -163,8 +163,8 @@ object SerializationTestUtils {
versionId = 0,
groupId = "group 1",
requestInfo = collection.immutable.Map(
- TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
- TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
+ TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata"),
+ TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata)
))
}