You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/28 23:07:37 UTC

kafka git commit: KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead

Repository: kafka
Updated Branches:
  refs/heads/trunk a4802962c -> 0aff45096


KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead

This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.

Author: Ishita Mandhan <im...@us.ibm.com>

Reviewers: Vahid Hashemian <va...@us.ibm.com>, Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #1429 from imandhan/KAFKA-3158


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0aff4509
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0aff4509
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0aff4509

Branch: refs/heads/trunk
Commit: 0aff450961a8dd14cc7820ee8d1c9eea855439b0
Parents: a480296
Author: Ishita Mandhan <im...@us.ibm.com>
Authored: Sat May 28 23:30:10 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 28 23:30:10 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    | 10 +++---
 .../kafka/admin/ConsumerGroupCommand.scala      | 33 +++++++++++---------
 .../integration/kafka/api/AdminClientTest.scala |  2 +-
 3 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index ebb5026..8572ceb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -143,21 +143,21 @@ class AdminClient(val time: Time,
                              clientHost: String,
                              assignment: List[TopicPartition])
 
-  def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
+  def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = {
     val group = describeGroup(groupId)
     if (group.state == "Dead")
-      return List.empty[ConsumerSummary]
+      return None
 
     if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
       throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
 
     if (group.state == "Stable") {
-      group.members.map { member =>
+      Some(group.members.map { member =>
         val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
         new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
-      }
+      })
     } else {
-      List.empty
+      Some(List.empty)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 414e7ba..b086d8f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -312,22 +312,25 @@ object ConsumerGroupCommand {
     }
 
     protected def describeGroup(group: String) {
-      val consumerSummaries = adminClient.describeConsumerGroup(group)
-      if (consumerSummaries.isEmpty)
-        println(s"Consumer group `${group}` does not exist or is rebalancing.")
-      else {
-        val consumer = getConsumer()
-        printDescribeHeader()
-        consumerSummaries.foreach { consumerSummary =>
-          val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
-          val partitionOffsets = topicPartitions.flatMap { topicPartition =>
-            Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
-              topicPartition -> offsetAndMetadata.offset
+      adminClient.describeConsumerGroup(group) match {
+        case None => println(s"Consumer group `${group}` does not exist.")
+        case Some(consumerSummaries) =>
+          if (consumerSummaries.isEmpty)
+            println(s"Consumer group `${group}` is rebalancing.")
+          else {
+            val consumer = getConsumer()
+            printDescribeHeader()
+            consumerSummaries.foreach { consumerSummary =>
+              val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
+              val partitionOffsets = topicPartitions.flatMap { topicPartition =>
+                Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
+                  topicPartition -> offsetAndMetadata.offset
+                }
+              }.toMap
+              describeTopicPartition(group, topicPartitions, partitionOffsets.get,
+                _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
             }
-          }.toMap
-          describeTopicPartition(group, topicPartitions, partitionOffsets.get,
-            _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
-        }
+          }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 7fae81e..3d39475 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
 
     val consumerSummaries = client.describeConsumerGroup(groupId)
     assertEquals(1, consumerSummaries.size)
-    assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet)
+    assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
   }
 
   @Test