You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/01 19:31:24 UTC

kafka git commit: KAFKA-4357; Fix consumer group describe output when there is no active member (old consumer)

Repository: kafka
Updated Branches:
  refs/heads/trunk fbbe5821c -> 94909a8f8


KAFKA-4357; Fix consumer group describe output when there is no active member (old consumer)

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Jason Gustafson <ja...@confluent.io>

Closes #2075 from vahidhashemian/KAFKA-4357


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94909a8f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94909a8f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94909a8f

Branch: refs/heads/trunk
Commit: 94909a8f83bfe214726f85130ad04d867e022894
Parents: fbbe582
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Tue Nov 1 11:36:12 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 1 11:36:12 2016 -0700

----------------------------------------------------------------------
 .../kafka/admin/ConsumerGroupCommand.scala      | 35 ++++++++++----------
 .../kafka/admin/DescribeConsumerGroupTest.scala | 29 +++++++++++++++-
 2 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index a9cd6d3..b53856e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -71,25 +71,26 @@ object ConsumerGroupCommand extends Logging {
         val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
         assignments match {
           case None =>
+            // applies to both old and new consumer
             printError(s"The consumer group '$groupId' does not exist.")
           case Some(assignments) =>
-            state match {
-              case Some("Dead") =>
-                printError(s"Consumer group '$groupId' does not exist.")
-              case Some("Empty") =>
-                printError(s"Consumer group '$groupId' has no active members.")
-              case Some("PreparingRebalance") | Some("AwaitingSync") =>
-                System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
-                printAssignment(assignments, !opts.useOldConsumer)
-              case Some("Stable") =>
-                printAssignment(assignments, !opts.useOldConsumer)
-              case Some(other) =>
-                // the control should never reach here
-                throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.")
-              case None =>
-                // the control should never reach here
-                throw new KafkaException("Expected a valid consumer group state, but none found.")
-            }
+            if (opts.useOldConsumer)
+              printAssignment(assignments, false)
+            else
+              state match {
+                case Some("Dead") =>
+                  printError(s"Consumer group '$groupId' does not exist.")
+                case Some("Empty") =>
+                  printError(s"Consumer group '$groupId' has no active members.")
+                case Some("PreparingRebalance") | Some("AwaitingSync") =>
+                  System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
+                  printAssignment(assignments, true)
+                case Some("Stable") =>
+                  printAssignment(assignments, true)
+                case other =>
+                  // the control should never reach here
+                  throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
+              }
         }
       }
       else if (opts.options.has(opts.deleteOpt)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 39bcb7a..b9c760d 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -90,7 +90,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         val (_, assignments) = consumerGroupCommand.describeGroup()
         assignments.isDefined &&
         assignments.get.filter(_.group == group).size == 1 &&
-        assignments.get.filter(_.group == group).head.consumerId.isDefined
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.nonEmpty)
       }, "Expected rows and a member id column in describe group results.")
 
     // cleanup
@@ -99,6 +99,33 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testDescribeExistingGroupWithNoMembers() {
+    // mocks
+    val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
+
+    // stubs
+    val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
+    val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+    // simulation
+    EasyMock.replay(consumerMock)
+
+    // action/test
+    val (_, a1) = consumerGroupCommand.describeGroup() // there should be a member here
+    consumerMock.stop()
+    TestUtils.waitUntilTrue(() => {
+        val (_, assignments) = consumerGroupCommand.describeGroup()
+        assignments.isDefined &&
+        assignments.get.filter(_.group == group).size == 1 &&
+        assignments.get.filter(_.group == group).head.consumerId.isDefined &&
+        assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.isEmpty) // the member should be gone
+      }, "Expected no active member in describe group results.")
+
+    // cleanup
+    consumerGroupCommand.close()
+  }
+
+  @Test
   def testDescribeConsumersWithNoAssignedPartitions() {
     // mocks
     val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()