You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/05/15 23:42:09 UTC

git commit: KAFKA-1437; Consumer metadata response should include (empty) coordinator information if the coordinator is unavailable; reviewed by Neha Narkhede and Guozhang Wang.

Repository: kafka
Updated Branches:
  refs/heads/trunk 31e32b386 -> c179c45f2


KAFKA-1437; Consumer metadata response should include (empty) coordinator information if the coordinator is unavailable; reviewed by Neha Narkhede and Guozhang Wang.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c179c45f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c179c45f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c179c45f

Branch: refs/heads/trunk
Commit: c179c45f2375cb1253e519d0be8c8e9e6ff679f6
Parents: 31e32b3
Author: Joel Koshy <jj...@gmail.com>
Authored: Tue May 6 10:55:44 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Thu May 15 14:41:29 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/ConsumerMetadataResponse.scala   | 10 +++++-----
 core/src/main/scala/kafka/client/ClientUtils.scala        |  2 +-
 .../scala/kafka/javaapi/ConsumerMetadataResponse.scala    |  2 +-
 core/src/test/scala/other/kafka/TestOffsetManager.scala   |  2 +-
 .../unit/kafka/api/RequestResponseSerializationTest.scala |  3 ++-
 .../test/scala/unit/kafka/server/OffsetCommitTest.scala   |  2 +-
 6 files changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
index 6807f98..f8cf6c3 100644
--- a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -23,6 +23,8 @@ import kafka.common.ErrorMapping
 
 object ConsumerMetadataResponse {
   val CurrentVersion = 0
+
+  private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
   
   def readFrom(buffer: ByteBuffer) = {
     val correlationId = buffer.getInt
@@ -37,20 +39,18 @@ object ConsumerMetadataResponse {
   
 }
 
-case class ConsumerMetadataResponse (coordinator: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
+case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
   extends RequestOrResponse(correlationId = correlationId) {
 
   def sizeInBytes =
     4 + /* correlationId */
     2 + /* error code */
-    coordinator.map(_.sizeInBytes).getOrElse(0)
+    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).get.sizeInBytes
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
-    if (errorCode == ErrorMapping.NoError) {
-      coordinator.get.writeTo(buffer)
-    }
+    coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
   }
 
   def describe(details: Boolean) = toString

http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index fc9e084..ba5fbdc 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -159,7 +159,7 @@ object ClientUtils extends Logging{
            val consumerMetadataResponse =  ConsumerMetadataResponse.readFrom(response.buffer)
            debug("Consumer metadata response: " + consumerMetadataResponse.toString)
            if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
-             coordinatorOpt = consumerMetadataResponse.coordinator
+             coordinatorOpt = consumerMetadataResponse.coordinatorOpt
          }
          catch {
            case ioe: IOException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index dfa9c42..1b28861 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -25,7 +25,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat
 
   def coordinator: Broker = {
     import kafka.javaapi.Implicits._
-    underlying.coordinator
+    underlying.coordinatorOpt
   }
 
   override def equals(other: Any) = canEqual(other) && {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index c468419..41f334d 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -119,7 +119,7 @@ object TestOffsetManager {
       val group = "group-" + id
       try {
         metadataChannel.send(ConsumerMetadataRequest(group))
-        val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinator.map(_.id).getOrElse(-1)
+        val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1)
 
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index d39a9a4..a2117b3 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -200,6 +200,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
+  private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
 
   @Test
   def testSerializationAndDeserialization() {
@@ -213,7 +214,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
                                topicMetadataRequest, topicMetadataResponse,
                                offsetCommitRequest, offsetCommitResponse,
                                offsetFetchRequest, offsetFetchResponse,
-                               consumerMetadataRequest, consumerMetadataResponse)
+                               consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator)
 
     requestsAndResponses.foreach { original =>
       val buffer = ByteBuffer.allocate(original.sizeInBytes)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c179c45f/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 19a8635..2d93250 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -54,7 +54,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val consumerMetadataRequest = ConsumerMetadataRequest(group)
     Stream.continually {
       val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
-      consumerMetadataResponse.coordinator.isDefined
+      consumerMetadataResponse.coordinatorOpt.isDefined
     }.dropWhile(success => {
       if (!success) Thread.sleep(1000)
       !success