You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/09 01:10:57 UTC

kafka git commit: KAFKA-5176; AdminClient: add controller and clusterId methods to DescribeClusterResults

Repository: kafka
Updated Branches:
  refs/heads/trunk 59b918ec2 -> 78ace3725


KAFKA-5176; AdminClient: add controller and clusterId methods to DescribeClusterResults

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: dan norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #2977 from cmccabe/KAFKA-5176


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78ace372
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78ace372
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78ace372

Branch: refs/heads/trunk
Commit: 78ace3725151dc056e83a8ec389141fc8809d4c0
Parents: 59b918e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 9 02:10:24 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 9 02:10:31 2017 +0100

----------------------------------------------------------------------
 .../clients/admin/DescribeClusterResults.java   | 30 +++++++++++++++++---
 .../kafka/clients/admin/KafkaAdminClient.java   |  9 +++++-
 .../api/KafkaAdminClientIntegrationTest.scala   |  8 +++++-
 3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
index 5ee834b..a51c1c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -28,16 +28,38 @@ import java.util.Collection;
  */
 @InterfaceStability.Unstable
 public class DescribeClusterResults {
-    private final KafkaFuture<Collection<Node>> future;
+    private final KafkaFuture<Collection<Node>> nodes;
+    private final KafkaFuture<Node> controller;
+    private final KafkaFuture<String> clusterId;
 
-    DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
-        this.future = future;
+    DescribeClusterResults(KafkaFuture<Collection<Node>> nodes,
+                           KafkaFuture<Node> controller,
+                           KafkaFuture<String> clusterId) {
+        this.nodes = nodes;
+        this.controller = controller;
+        this.clusterId = clusterId;
     }
 
     /**
      * Returns a future which yields a collection of nodes.
      */
     public KafkaFuture<Collection<Node>> nodes() {
-        return future;
+        return nodes;
+    }
+
+    /**
+     * Returns a future which yields the current controller id.
+     * Note that this may yield null, if the controller ID is not yet known.
+     */
+    public KafkaFuture<Node> controller() {
+        return controller;
+    }
+
+    /**
+     * Returns a future which yields the current cluster Id.
+     * Note that this may yield null, if the cluster version is too old.
+     */
+    public KafkaFuture<String> clusterId() {
+        return clusterId;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ad921f8..ec10232 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1011,6 +1011,8 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
         final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
+        final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
         runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
             new LeastLoadedNodeProvider()) {
@@ -1024,14 +1026,19 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse response = (MetadataResponse) abstractResponse;
                 describeClusterFuture.complete(response.brokers());
+                controllerFuture.complete(response.controller());
+                clusterIdFuture.complete(response.clusterId());
             }
 
             @Override
             void handleFailure(Throwable throwable) {
                 describeClusterFuture.completeExceptionally(throwable);
+                controllerFuture.completeExceptionally(throwable);
+                clusterIdFuture.completeExceptionally(throwable);
             }
         }, now);
-        return new DescribeClusterResults(describeClusterFuture);
+
+        return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 455ab61..07eb673 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.KafkaFuture
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{After, Before, Rule, Test}
+import org.apache.kafka.common.requests.MetadataResponse
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
@@ -135,9 +136,14 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
   }
 
   @Test
-  def testGetAllBrokerVersions(): Unit = {
+  def testGetAllBrokerVersionsAndDescribeCluster(): Unit = {
     client = AdminClient.create(createConfig())
     val nodes = client.describeCluster().nodes().get()
+    val clusterId = client.describeCluster().clusterId().get()
+    assertEquals(servers.head.apis.clusterId, clusterId)
+    val controller = client.describeCluster().controller().get()
+    assertEquals(servers.head.apis.metadataCache.getControllerId.
+      getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
     val nodesToVersions = client.apiVersions(nodes).all().get()
     val brokers = brokerList.split(",")
     assert(brokers.size == nodesToVersions.size())