You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/28 23:07:37 UTC
kafka git commit: KAFKA-3158;
ConsumerGroupCommand should tell whether group is actually dead
Repository: kafka
Updated Branches:
refs/heads/trunk a4802962c -> 0aff45096
KAFKA-3158; ConsumerGroupCommand should tell whether group is actually dead
This patch fix differentiates between when a consumer group is rebalancing or dead and reports the appropriate error message.
Author: Ishita Mandhan <im...@us.ibm.com>
Reviewers: Vahid Hashemian <va...@us.ibm.com>, Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #1429 from imandhan/KAFKA-3158
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0aff4509
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0aff4509
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0aff4509
Branch: refs/heads/trunk
Commit: 0aff450961a8dd14cc7820ee8d1c9eea855439b0
Parents: a480296
Author: Ishita Mandhan <im...@us.ibm.com>
Authored: Sat May 28 23:30:10 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 28 23:30:10 2016 +0100
----------------------------------------------------------------------
.../main/scala/kafka/admin/AdminClient.scala | 10 +++---
.../kafka/admin/ConsumerGroupCommand.scala | 33 +++++++++++---------
.../integration/kafka/api/AdminClientTest.scala | 2 +-
3 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index ebb5026..8572ceb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -143,21 +143,21 @@ class AdminClient(val time: Time,
clientHost: String,
assignment: List[TopicPartition])
- def describeConsumerGroup(groupId: String): List[ConsumerSummary] = {
+ def describeConsumerGroup(groupId: String): Option[List[ConsumerSummary]] = {
val group = describeGroup(groupId)
if (group.state == "Dead")
- return List.empty[ConsumerSummary]
+ return None
if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE)
throw new IllegalArgumentException(s"Group ${groupId} with protocol type '${group.protocolType}' is not a valid consumer group")
if (group.state == "Stable") {
- group.members.map { member =>
+ Some(group.members.map { member =>
val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
new ConsumerSummary(member.memberId, member.clientId, member.clientHost, assignment.partitions().asScala.toList)
- }
+ })
} else {
- List.empty
+ Some(List.empty)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 414e7ba..b086d8f 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -312,22 +312,25 @@ object ConsumerGroupCommand {
}
protected def describeGroup(group: String) {
- val consumerSummaries = adminClient.describeConsumerGroup(group)
- if (consumerSummaries.isEmpty)
- println(s"Consumer group `${group}` does not exist or is rebalancing.")
- else {
- val consumer = getConsumer()
- printDescribeHeader()
- consumerSummaries.foreach { consumerSummary =>
- val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
- val partitionOffsets = topicPartitions.flatMap { topicPartition =>
- Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
- topicPartition -> offsetAndMetadata.offset
+ adminClient.describeConsumerGroup(group) match {
+ case None => println(s"Consumer group `${group}` does not exist.")
+ case Some(consumerSummaries) =>
+ if (consumerSummaries.isEmpty)
+ println(s"Consumer group `${group}` is rebalancing.")
+ else {
+ val consumer = getConsumer()
+ printDescribeHeader()
+ consumerSummaries.foreach { consumerSummary =>
+ val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
+ val partitionOffsets = topicPartitions.flatMap { topicPartition =>
+ Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
+ topicPartition -> offsetAndMetadata.offset
+ }
+ }.toMap
+ describeTopicPartition(group, topicPartitions, partitionOffsets.get,
+ _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
}
- }.toMap
- describeTopicPartition(group, topicPartitions, partitionOffsets.get,
- _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0aff4509/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index 7fae81e..3d39475 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -106,7 +106,7 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
val consumerSummaries = client.describeConsumerGroup(groupId)
assertEquals(1, consumerSummaries.size)
- assertEquals(Set(tp, tp2), consumerSummaries.head.assignment.toSet)
+ assertEquals(Some(Set(tp, tp2)), consumerSummaries.map(_.head.assignment.toSet))
}
@Test