You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/28 01:58:12 UTC

kafka git commit: KAFKA-4349; Handle 'PreparingRebalance' and 'AwaitingSync' states in consumer group describe

Repository: kafka
Updated Branches:
  refs/heads/trunk 53fc26723 -> 34e9cc5df


KAFKA-4349; Handle 'PreparingRebalance' and 'AwaitingSync' states in consumer group describe

The edge case where consumer group state is `PreparingRebalance` or `AwaitingSync` will be separately handled as the group assignment is not yet determined.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2070 from vahidhashemian/KAFKA-4349


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/34e9cc5d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/34e9cc5d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/34e9cc5d

Branch: refs/heads/trunk
Commit: 34e9cc5dfae1f0a7b2cab51cb2939d48ba048964
Parents: 53fc267
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Thu Oct 27 18:41:38 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Oct 27 18:41:38 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/AdminClient.scala    | 10 +++++--
 .../kafka/admin/ConsumerGroupCommand.scala      | 31 +++++++++++---------
 2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/34e9cc5d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 22a8abb..1179557 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -149,9 +149,15 @@ class AdminClient(val time: Time,
 
     Errors.forCode(metadata.errorCode()).maybeThrow()
     val consumers = metadata.members.map { consumer =>
-      val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
-      ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, assignment.partitions.toList)
+      ConsumerSummary(consumer.memberId, consumer.clientId, consumer.clientHost, metadata.state match {
+        case "Stable" =>
+          val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(Utils.readBytes(consumer.memberAssignment)))
+          assignment.partitions.toList
+        case _ =>
+          List()
+      })
     }.toList
+
     ConsumerGroupSummary(metadata.state, metadata.protocol, Some(consumers), coordinator)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/34e9cc5d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6300d76..a9cd6d3 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -73,20 +73,23 @@ object ConsumerGroupCommand extends Logging {
           case None =>
             printError(s"The consumer group '$groupId' does not exist.")
           case Some(assignments) =>
-            if (assignments.isEmpty)
-              state match {
-                case Some("Dead") =>
-                  printError(s"Consumer group '$groupId' does not exist.")
-                case Some("Empty") =>
-                  printError(s"Consumer group '$groupId' has no active members.")
-                case Some(_) =>
-                  printError(s"Consumer group '$groupId' is rebalancing.")
-                case None =>
-                  // the control should never reach here
-                  throw new KafkaException("Expected a valid consumer group state, but none found.")
-              }
-            else
-              printAssignment(assignments, !opts.useOldConsumer)
+            state match {
+              case Some("Dead") =>
+                printError(s"Consumer group '$groupId' does not exist.")
+              case Some("Empty") =>
+                printError(s"Consumer group '$groupId' has no active members.")
+              case Some("PreparingRebalance") | Some("AwaitingSync") =>
+                System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
+                printAssignment(assignments, !opts.useOldConsumer)
+              case Some("Stable") =>
+                printAssignment(assignments, !opts.useOldConsumer)
+              case Some(other) =>
+                // the control should never reach here
+                throw new KafkaException(s"Expected a valid consumer group state, but found '$other'.")
+              case None =>
+                // the control should never reach here
+                throw new KafkaException("Expected a valid consumer group state, but none found.")
+            }
         }
       }
       else if (opts.options.has(opts.deleteOpt)) {