You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/09 01:10:57 UTC
kafka git commit: KAFKA-5176;
AdminClient: add controller and clusterId methods to
DescribeClusterResults
Repository: kafka
Updated Branches:
refs/heads/trunk 59b918ec2 -> 78ace3725
KAFKA-5176; AdminClient: add controller and clusterId methods to DescribeClusterResults
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: dan norwood <no...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #2977 from cmccabe/KAFKA-5176
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78ace372
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78ace372
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78ace372
Branch: refs/heads/trunk
Commit: 78ace3725151dc056e83a8ec389141fc8809d4c0
Parents: 59b918e
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Tue May 9 02:10:24 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 9 02:10:31 2017 +0100
----------------------------------------------------------------------
.../clients/admin/DescribeClusterResults.java | 30 +++++++++++++++++---
.../kafka/clients/admin/KafkaAdminClient.java | 9 +++++-
.../api/KafkaAdminClientIntegrationTest.scala | 8 +++++-
3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
index 5ee834b..a51c1c8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java
@@ -28,16 +28,38 @@ import java.util.Collection;
*/
@InterfaceStability.Unstable
public class DescribeClusterResults {
- private final KafkaFuture<Collection<Node>> future;
+ private final KafkaFuture<Collection<Node>> nodes;
+ private final KafkaFuture<Node> controller;
+ private final KafkaFuture<String> clusterId;
- DescribeClusterResults(KafkaFuture<Collection<Node>> future) {
- this.future = future;
+ DescribeClusterResults(KafkaFuture<Collection<Node>> nodes,
+ KafkaFuture<Node> controller,
+ KafkaFuture<String> clusterId) {
+ this.nodes = nodes;
+ this.controller = controller;
+ this.clusterId = clusterId;
}
/**
* Returns a future which yields a collection of nodes.
*/
public KafkaFuture<Collection<Node>> nodes() {
- return future;
+ return nodes;
+ }
+
+ /**
+ * Returns a future which yields the current controller id.
+ * Note that this may yield null, if the controller ID is not yet known.
+ */
+ public KafkaFuture<Node> controller() {
+ return controller;
+ }
+
+ /**
+ * Returns a future which yields the current cluster Id.
+ * Note that this may yield null, if the cluster version is too old.
+ */
+ public KafkaFuture<String> clusterId() {
+ return clusterId;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index ad921f8..ec10232 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1011,6 +1011,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeClusterResults describeCluster(DescribeClusterOptions options) {
final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
+ final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
+ final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@@ -1024,14 +1026,19 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse) abstractResponse;
describeClusterFuture.complete(response.brokers());
+ controllerFuture.complete(response.controller());
+ clusterIdFuture.complete(response.clusterId());
}
@Override
void handleFailure(Throwable throwable) {
describeClusterFuture.completeExceptionally(throwable);
+ controllerFuture.completeExceptionally(throwable);
+ clusterIdFuture.completeExceptionally(throwable);
}
}, now);
- return new DescribeClusterResults(describeClusterFuture);
+
+ return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 455ab61..07eb673 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.{After, Before, Rule, Test}
+import org.apache.kafka.common.requests.MetadataResponse
import org.junit.rules.Timeout
import org.junit.Assert._
@@ -135,9 +136,14 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
}
@Test
- def testGetAllBrokerVersions(): Unit = {
+ def testGetAllBrokerVersionsAndDescribeCluster(): Unit = {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster().nodes().get()
+ val clusterId = client.describeCluster().clusterId().get()
+ assertEquals(servers.head.apis.clusterId, clusterId)
+ val controller = client.describeCluster().controller().get()
+ assertEquals(servers.head.apis.metadataCache.getControllerId.
+ getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val nodesToVersions = client.apiVersions(nodes).all().get()
val brokers = brokerList.split(",")
assert(brokers.size == nodesToVersions.size())