You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/05/15 23:42:09 UTC
git commit: KAFKA-1437;
Consumer metadata response should include (empty) coordinator
information if the coordinator is unavailable;
reviewed by Neha Narkhede and Guozhang Wang.
Repository: kafka
Updated Branches:
refs/heads/trunk 31e32b386 -> c179c45f2
KAFKA-1437; Consumer metadata response should include (empty) coordinator information if the coordinator is unavailable; reviewed by Neha Narkhede and Guozhang Wang.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c179c45f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c179c45f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c179c45f
Branch: refs/heads/trunk
Commit: c179c45f2375cb1253e519d0be8c8e9e6ff679f6
Parents: 31e32b3
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue May 6 10:55:44 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 15 14:41:29 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/api/ConsumerMetadataResponse.scala | 10 +++++-----
core/src/main/scala/kafka/client/ClientUtils.scala | 2 +-
.../scala/kafka/javaapi/ConsumerMetadataResponse.scala | 2 +-
core/src/test/scala/other/kafka/TestOffsetManager.scala | 2 +-
.../unit/kafka/api/RequestResponseSerializationTest.scala | 3 ++-
.../test/scala/unit/kafka/server/OffsetCommitTest.scala | 2 +-
6 files changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 6807f98..f8cf6c3 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -23,6 +23,8 @@ import kafka.common.ErrorMapping
object ConsumerMetadataResponse {
val CurrentVersion = 0
+
+ private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
def readFrom(buffer: ByteBuffer) = {
val correlationId = buffer.getInt
@@ -37,20 +39,18 @@ object ConsumerMetadataResponse {
}
-case class ConsumerMetadataResponse (coordinator: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
extends RequestOrResponse(correlationId = correlationId) {
def sizeInBytes =
4 + /* correlationId */
2 + /* error code */
- coordinator.map(_.sizeInBytes).getOrElse(0)
+ coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).get.sizeInBytes
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putShort(errorCode)
- if (errorCode == ErrorMapping.NoError) {
- coordinator.get.writeTo(buffer)
- }
+ coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
}
def describe(details: Boolean) = toString
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fc9e084..ba5fbdc 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -159,7 +159,7 @@ object ClientUtils extends Logging{
val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer)
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
- coordinatorOpt = consumerMetadataResponse.coordinator
+ coordinatorOpt = consumerMetadataResponse.coordinatorOpt
}
catch {
case ioe: IOException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index dfa9c42..1b28861 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -25,7 +25,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat
def coordinator: Broker = {
import kafka.javaapi.Implicits._
- underlying.coordinator
+ underlying.coordinatorOpt
}
override def equals(other: Any) = canEqual(other) && {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index c468419..41f334d 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -119,7 +119,7 @@ object TestOffsetManager {
val group = "group-" + id
try {
metadataChannel.send(ConsumerMetadataRequest(group))
- val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinator.map(_.id).getOrElse(-1)
+ val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1)
val channel = if (channels.contains(coordinatorId))
channels(coordinatorId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index d39a9a4..a2117b3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -200,6 +200,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
+ private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
@Test
def testSerializationAndDeserialization() {
@@ -213,7 +214,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
topicMetadataRequest, topicMetadataResponse,
offsetCommitRequest, offsetCommitResponse,
offsetFetchRequest, offsetFetchResponse,
- consumerMetadataRequest, consumerMetadataResponse)
+ consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator)
requestsAndResponses.foreach { original =>
val buffer = ByteBuffer.allocate(original.sizeInBytes)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 19a8635..2d93250 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -54,7 +54,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val consumerMetadataRequest = ConsumerMetadataRequest(group)
Stream.continually {
val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
- consumerMetadataResponse.coordinator.isDefined
+ consumerMetadataResponse.coordinatorOpt.isDefined
}.dropWhile(success => {
if (!success) Thread.sleep(1000)
!success