You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/01 19:31:24 UTC
kafka git commit: KAFKA-4357;
Fix consumer group describe output when there is no active member
(old consumer)
Repository: kafka
Updated Branches:
refs/heads/trunk fbbe5821c -> 94909a8f8
KAFKA-4357; Fix consumer group describe output when there is no active member (old consumer)
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Jason Gustafson <ja...@confluent.io>
Closes #2075 from vahidhashemian/KAFKA-4357
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94909a8f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94909a8f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94909a8f
Branch: refs/heads/trunk
Commit: 94909a8f83bfe214726f85130ad04d867e022894
Parents: fbbe582
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Tue Nov 1 11:36:12 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 1 11:36:12 2016 -0700
----------------------------------------------------------------------
.../kafka/admin/ConsumerGroupCommand.scala | 35 ++++++++++----------
.../kafka/admin/DescribeConsumerGroupTest.scala | 29 +++++++++++++++-
2 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index a9cd6d3..b53856e 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -71,25 +71,26 @@ object ConsumerGroupCommand extends Logging {
val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
assignments match {
case None =>
+ // applies to both old and new consumer
printError(s"The consumer group '$groupId' does not exist.")
case Some(assignments) =>
- state match {
- case Some("Dead") =>
- printError(s"Consumer group '$groupId' does not exist.")
- case Some("Empty") =>
- printError(s"Consumer group '$groupId' has no active members.")
- case Some("PreparingRebalance") | Some("AwaitingSync") =>
- System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
- printAssignment(assignments, !opts.useOldConsumer)
- case Some("Stable") =>
- printAssignment(assignments, !opts.useOldConsumer)
- case Some(other) =>
- // the control should never reach here
- throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.")
- case None =>
- // the control should never reach here
- throw new KafkaException("Expected a valid consumer group state, but none found.")
- }
+ if (opts.useOldConsumer)
+ printAssignment(assignments, false)
+ else
+ state match {
+ case Some("Dead") =>
+ printError(s"Consumer group '$groupId' does not exist.")
+ case Some("Empty") =>
+ printError(s"Consumer group '$groupId' has no active members.")
+ case Some("PreparingRebalance") | Some("AwaitingSync") =>
+ System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
+ printAssignment(assignments, true)
+ case Some("Stable") =>
+ printAssignment(assignments, true)
+ case other =>
+ // the control should never reach here
+ throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
+ }
}
}
else if (opts.options.has(opts.deleteOpt)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/94909a8f/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 39bcb7a..b9c760d 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -90,7 +90,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
val (_, assignments) = consumerGroupCommand.describeGroup()
assignments.isDefined &&
assignments.get.filter(_.group == group).size == 1 &&
- assignments.get.filter(_.group == group).head.consumerId.isDefined
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.nonEmpty)
}, "Expected rows and a member id column in describe group results.")
// cleanup
@@ -99,6 +99,33 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
}
@Test
+ def testDescribeExistingGroupWithNoMembers() {
+ // mocks
+ val consumerMock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()
+
+ // stubs
+ val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group))
+ val consumerGroupCommand = new ZkConsumerGroupService(opts)
+
+ // simulation
+ EasyMock.replay(consumerMock)
+
+ // action/test
+ val (_, a1) = consumerGroupCommand.describeGroup() // there should be a member here
+ consumerMock.stop()
+ TestUtils.waitUntilTrue(() => {
+ val (_, assignments) = consumerGroupCommand.describeGroup()
+ assignments.isDefined &&
+ assignments.get.filter(_.group == group).size == 1 &&
+ assignments.get.filter(_.group == group).head.consumerId.isDefined &&
+ assignments.get.filter(_.group == group).head.consumerId.exists(_.trim.isEmpty) // the member should be gone
+ }, "Expected no active member in describe group results.")
+
+ // cleanup
+ consumerGroupCommand.close()
+ }
+
+ @Test
def testDescribeConsumersWithNoAssignedPartitions() {
// mocks
val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock()