You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/05 15:53:00 UTC

[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API

    [ https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313341#comment-16313341 ] 

ASF GitHub Bot commented on KAFKA-6311:
---------------------------------------

hachikuji closed pull request #4314: KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238)
URL: https://github.com/apache/kafka/pull/4314
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 11fe428b731..49ad8d02872 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,13 +38,41 @@
 import java.util.concurrent.TimeUnit;
 
 public class MockAdminClient extends AdminClient {
+    public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+
     private final List<Node> brokers;
     private final Map<String, TopicMetadata> allTopics = new HashMap<>();
+    private final String clusterId;
 
+    private Node controller;
     private int timeoutNextRequests = 0;
 
-    public MockAdminClient(List<Node> brokers) {
+    /**
+     * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from
+     * DEFAULT_CLUSTER_ID.
+     *
+     * @param brokers list of brokers in the cluster
+     * @param controller node that should start as the controller
+     */
+    public MockAdminClient(List<Node> brokers, Node controller) {
+        this(brokers, controller, DEFAULT_CLUSTER_ID);
+    }
+
+    /**
+     * Creates MockAdminClient for a cluster with the given brokers.
+     * @param brokers list of brokers in the cluster
+     * @param controller node that should start as the controller
+     */
+    public MockAdminClient(List<Node> brokers, Node controller, String clusterId) {
         this.brokers = brokers;
+        controller(controller);
+        this.clusterId = clusterId;
+    }
+
+    public void controller(Node controller) {
+        if (!brokers.contains(controller))
+            throw new IllegalArgumentException("The controller node must be in the list of brokers");
+        this.controller = controller;
     }
 
     public void addTopic(boolean internal,
@@ -82,7 +110,22 @@ public void timeoutNextRequest(int numberOfRequest) {
 
     @Override
     public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        KafkaFutureImpl<Collection<Node>> nodesFuture = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> brokerIdFuture = new KafkaFutureImpl<>();
+
+        if (timeoutNextRequests > 0) {
+            nodesFuture.completeExceptionally(new TimeoutException());
+            controllerFuture.completeExceptionally(new TimeoutException());
+            brokerIdFuture.completeExceptionally(new TimeoutException());
+            --timeoutNextRequests;
+        } else {
+            nodesFuture.complete(brokers);
+            controllerFuture.complete(controller);
+            brokerIdFuture.complete(clusterId);
+        }
+
+        return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1b2f94e4613..98a77ed06c9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,8 @@ public static void main(String[] args) throws Exception {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig config = new DistributedConfig(workerProps);
 
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -85,7 +88,8 @@ public static void main(String[] args) throws Exception {
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
 
-        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
+        DistributedHerder herder = new DistributedHerder(config, time, worker,
+                kafkaClusterId, statusBackingStore, configBackingStore,
                 advertisedUrl.toString());
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 551c6b51d3b..17699053541 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,13 +77,15 @@ public static void main(String[] args) throws Exception {
         plugins.compareAndSwapWithDelegatingLoader();
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
         Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
 
-        Herder herder = new StandaloneHerder(worker);
+        Herder herder = new StandaloneHerder(worker, kafkaClusterId);
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fbe0ae2afb2..02465c922f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -79,6 +79,7 @@
 
     private final String workerId;
     protected final Worker worker;
+    private final String kafkaClusterId;
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
 
@@ -86,14 +87,21 @@
 
     public AbstractHerder(Worker worker,
                           String workerId,
+                          String kafkaClusterId,
                           StatusBackingStore statusBackingStore,
                           ConfigBackingStore configBackingStore) {
         this.worker = worker;
         this.workerId = workerId;
+        this.kafkaClusterId = kafkaClusterId;
         this.statusBackingStore = statusBackingStore;
         this.configBackingStore = configBackingStore;
     }
 
+    @Override
+    public String kafkaClusterId() {
+        return kafkaClusterId;
+    }
+
     protected abstract int generation();
 
     protected void startServices() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5dfb808f764..855b08a8a7f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -176,6 +176,13 @@
      */
     Plugins plugins();
 
+
+    /**
+     * Get the cluster ID of the Kafka cluster backing this Connect cluster.
+     * @return the cluster ID of the Kafka cluster backing this connect cluster
+     */
+    String kafkaClusterId();
+
     class Created<T> {
         private final boolean created;
         private final T result;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a1cc56a487b..c39ee67effc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -149,10 +149,11 @@
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
+                             String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl) {
-        this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
+        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -160,13 +161,14 @@ public DistributedHerder(DistributedConfig config,
     DistributedHerder(DistributedConfig config,
                       Worker worker,
                       String workerId,
+                      String kafkaClusterId,
                       StatusBackingStore statusBackingStore,
                       ConfigBackingStore configBackingStore,
                       WorkerGroupMember member,
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time) {
-        super(worker, workerId, statusBackingStore, configBackingStore);
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
 
         this.time = time;
         this.herderMetrics = new HerderMetrics(metrics);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 7afb4e80572..4a2f913cd42 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -100,7 +100,7 @@ public void start(Herder herder) {
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(RootResource.class);
+        resourceConfig.register(new RootResource(herder));
         resourceConfig.register(new ConnectorsResource(herder));
         resourceConfig.register(new ConnectorPluginsResource(herder));
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
index 25ce7311bac..a12751c7ae9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -20,12 +20,14 @@
 import org.apache.kafka.common.utils.AppInfoParser;
 
 public class ServerInfo {
-    private String version;
-    private String commit;
+    private final String version;
+    private final String commit;
+    private final String kafkaClusterId;
 
-    public ServerInfo() {
-        version = AppInfoParser.getVersion();
-        commit = AppInfoParser.getCommitId();
+    public ServerInfo(String kafkaClusterId) {
+        this.version = AppInfoParser.getVersion();
+        this.commit = AppInfoParser.getCommitId();
+        this.kafkaClusterId = kafkaClusterId;
     }
 
     @JsonProperty
@@ -37,4 +39,9 @@ public String version() {
     public String commit() {
         return commit;
     }
+
+    @JsonProperty("kafka_cluster_id")
+    public String clusterId() {
+        return kafkaClusterId;
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 9d94001ecfa..9666bf15954 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
 import javax.ws.rs.GET;
@@ -27,9 +28,15 @@
 @Produces(MediaType.APPLICATION_JSON)
 public class RootResource {
 
+    private final Herder herder;
+
+    public RootResource(Herder herder) {
+        this.herder = herder;
+    }
+
     @GET
     @Path("/")
     public ServerInfo serverInfo() {
-        return new ServerInfo();
+        return new ServerInfo(herder.kafkaClusterId());
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index e9ec0f9092f..41609cb054d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -52,16 +52,17 @@
 
     private ClusterConfigState configState;
 
-    public StandaloneHerder(Worker worker) {
-        this(worker, worker.workerId(), new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
+    public StandaloneHerder(Worker worker, String kafkaClusterId) {
+        this(worker, worker.workerId(), kafkaClusterId, new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
     }
 
     // visible for testing
     StandaloneHerder(Worker worker,
                      String workerId,
+                     String kafkaClusterId,
                      StatusBackingStore statusBackingStore,
                      MemoryConfigBackingStore configBackingStore) {
-        super(worker, workerId, statusBackingStore, configBackingStore);
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
         this.configState = ClusterConfigState.EMPTY;
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 913ae1f1058..19452047fcb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,10 +16,20 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
+    private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+
     public static Long checkAndConvertTimestamp(Long timestamp) {
         if (timestamp == null || timestamp >= 0)
             return timestamp;
@@ -28,4 +38,28 @@ else if (timestamp == RecordBatch.NO_TIMESTAMP)
         else
             throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));
     }
+
+    public static String lookupKafkaClusterId(WorkerConfig config) {
+        try (AdminClient adminClient = AdminClient.create(config.originals())) {
+            return lookupKafkaClusterId(adminClient);
+        }
+    }
+
+    static String lookupKafkaClusterId(AdminClient adminClient) {
+        log.debug("Looking up Kafka cluster ID");
+        try {
+            KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
+            if (clusterIdFuture == null) {
+                log.info("Kafka cluster version is too old to return cluster ID");
+                return null;
+            }
+            String kafkaClusterId = clusterIdFuture.get();
+            log.info("Kafka cluster ID: {}", kafkaClusterId);
+            return kafkaClusterId;
+        } catch (InterruptedException e) {
+            throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
+        } catch (ExecutionException e) {
+            throw new ConnectException("Failed to connect to and describe Kafka cluster", e);
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ac9c312823c..500d31f42e0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -51,6 +51,7 @@
 public class AbstractHerderTest extends EasyMockSupport {
     private final Worker worker = strictMock(Worker.class);
     private final String workerId = "workerId";
+    private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
     private final String connector = "connector";
     private final Plugins plugins = strictMock(Plugins.class);
@@ -64,8 +65,8 @@ public void connectorStatus() {
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -107,8 +108,8 @@ public void taskStatus() {
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -228,8 +229,8 @@ private AbstractHerder createConfigValidationHerder(Class<? extends Connector> c
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
         EasyMock.expect(herder.generation()).andStubReturn(generation);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d41ccbebae0..d7307cfdbc4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -149,6 +149,7 @@
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
     private static final String WORKER_ID = "localhost:8083";
+    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
 
     @Mock private ConfigBackingStore configBackingStore;
     @Mock private StatusBackingStore statusBackingStore;
@@ -178,8 +179,10 @@ public void setUp() throws Exception {
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
 
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
-                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
+        herder = PowerMock.createPartialMock(DistributedHerder.class,
+                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
new file mode 100644
index 00000000000..4e928a37037
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(EasyMockRunner.class)
+public class RootResourceTest extends EasyMockSupport {
+
+    @Mock
+    private Herder herder;
+    private RootResource rootResource;
+
+    @Before
+    public void setUp() {
+        rootResource = new RootResource(herder);
+    }
+
+    @Test
+    public void testRootGet() {
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
+
+        replayAll();
+
+        ServerInfo info = rootResource.serverInfo();
+        assertEquals(AppInfoParser.getVersion(), info.version());
+        assertEquals(AppInfoParser.getCommitId(), info.commit());
+        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, info.clusterId());
+
+        verifyAll();
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 18d2739f13e..79be45b3290 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -89,6 +89,7 @@
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final int DEFAULT_MAX_TASKS = 1;
     private static final String WORKER_ID = "localhost:8083";
+    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
 
     private enum SourceSink {
         SOURCE, SINK
@@ -110,7 +111,7 @@
     public void setup() {
         worker = PowerMock.createMock(Worker.class);
         herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
-            worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore());
         plugins = PowerMock.createMock(Plugins.class);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
new file mode 100644
index 00000000000..6be3525380b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectUtilsTest {
+
+    @Test
+    public void testLookupKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+
+        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test
+    public void testLookupNullKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null);
+
+        assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testLookupKafkaClusterIdTimeout() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+        adminClient.timeoutNextRequest(1);
+
+        ConnectUtils.lookupKafkaClusterId(adminClient);
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 10d3dd94825..8afbde5eaec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -68,7 +68,7 @@
 
     @Before
     public void init() {
-        mockAdminClient = new MockAdminClient(cluster);
+        mockAdminClient = new MockAdminClient(cluster, broker1);
         internalTopicManager = new InternalTopicManager(
             mockAdminClient,
             config);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Expose Kafka cluster ID in Connect REST API
> -------------------------------------------
>
>                 Key: KAFKA-6311
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6311
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Xavier Léauté
>            Assignee: Ewen Cheslack-Postava
>              Labels: needs-kip
>             Fix For: 1.1.0
>
>
> Connect currently does not expose any information about the Kafka cluster it is connected to.
> In an environment with multiple Kafka clusters it would be useful to know which cluster Connect is talking to. Exposing this information enables programmatic discovery of Kafka cluster metadata for the purpose of configuring connectors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)