You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/10/15 06:47:12 UTC
git commit: KAFKA-1637 SimpleConsumer.fetchOffset returns wrong error
code when no offset exists for topic/partition/consumer group;
reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 22643bfc2 -> de432a09e
KAFKA-1637 SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de432a09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de432a09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de432a09
Branch: refs/heads/trunk
Commit: de432a09e632f78df9e580b51277f81582c3f026
Parents: 22643bf
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Oct 14 21:46:53 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Oct 14 21:47:02 2014 -0700
----------------------------------------------------------------------
.../kafka/common/OffsetMetadataAndError.scala | 1 +
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +++++++-
.../unit/kafka/server/OffsetCommitTest.scala | 19 ++++++++++++++++---
3 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/de432a09/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 1586243..4cabffe 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -51,5 +51,6 @@ object OffsetMetadataAndError {
val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
+ val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/de432a09/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 67f2833..6ad64d2 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -505,7 +505,13 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
- val status = offsetManager.getOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
+ // Missing
+ val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
+ replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition).isEmpty
+ )
+ val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
+ val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap
+ val status = missingStatus ++ availableStatus
val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/de432a09/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 2d93250..8c5364f 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -116,7 +116,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic1 = "topic-1"
val topic2 = "topic-2"
val topic3 = "topic-3"
- val topic4 = "topic-4"
+ val topic4 = "topic-4" // Topic that group never consumes
+ val topic5 = "topic-5" // Non-existent topic
+
+ createTopic(zkClient, topic1, servers = Seq(server), numPartitions = 1)
+ createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 2)
+ createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1)
+ createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1)
val commitRequest = OffsetCommitRequest("test-group", immutable.Map(
TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"),
@@ -136,7 +142,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
TopicAndPartition(topic3, 0),
TopicAndPartition(topic2, 1),
TopicAndPartition(topic3, 1), // An unknown partition
- TopicAndPartition(topic4, 0) // An unknown topic
+ TopicAndPartition(topic4, 0), // An unused topic
+ TopicAndPartition(topic5, 0) // An unknown topic
))
val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
@@ -144,8 +151,12 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
- assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
+ assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+ assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
+ assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
+ assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
@@ -153,6 +164,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+ assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata)
assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
@@ -160,6 +172,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+ assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset)
}
@Test