You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/05 15:52:53 UTC

[kafka] branch trunk updated: KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) (#4314)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bfb272c  KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) (#4314)
bfb272c is described below

commit bfb272c5cdb227040d862f0ab7337df68f787764
Author: Ewen Cheslack-Postava <me...@ewencp.org>
AuthorDate: Fri Jan 5 07:52:50 2018 -0800

    KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) (#4314)
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/MockAdminClient.java       | 47 +++++++++++++++-
 .../kafka/connect/cli/ConnectDistributed.java      |  6 ++-
 .../kafka/connect/cli/ConnectStandalone.java       |  5 +-
 .../kafka/connect/runtime/AbstractHerder.java      |  8 +++
 .../org/apache/kafka/connect/runtime/Herder.java   |  7 +++
 .../runtime/distributed/DistributedHerder.java     |  6 ++-
 .../kafka/connect/runtime/rest/RestServer.java     |  2 +-
 .../connect/runtime/rest/entities/ServerInfo.java  | 17 ++++--
 .../runtime/rest/resources/RootResource.java       |  9 +++-
 .../runtime/standalone/StandaloneHerder.java       |  7 +--
 .../apache/kafka/connect/util/ConnectUtils.java    | 34 ++++++++++++
 .../kafka/connect/runtime/AbstractHerderTest.java  | 13 ++---
 .../runtime/distributed/DistributedHerderTest.java |  7 ++-
 .../runtime/rest/resources/RootResourceTest.java   | 58 ++++++++++++++++++++
 .../runtime/standalone/StandaloneHerderTest.java   |  3 +-
 .../kafka/connect/util/ConnectUtilsTest.java       | 63 ++++++++++++++++++++++
 .../internals/InternalTopicManagerTest.java        |  2 +-
 17 files changed, 268 insertions(+), 26 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index d00bad0..b6a5888 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,13 +38,41 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 public class MockAdminClient extends AdminClient {
+    public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+
     private final List<Node> brokers;
     private final Map<String, TopicMetadata> allTopics = new HashMap<>();
+    private final String clusterId;
 
+    private Node controller;
     private int timeoutNextRequests = 0;
 
-    public MockAdminClient(List<Node> brokers) {
+    /**
+     * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from
+     * DEFAULT_CLUSTER_ID.
+     *
+     * @param brokers list of brokers in the cluster
+     * @param controller node that should start as the controller
+     */
+    public MockAdminClient(List<Node> brokers, Node controller) {
+        this(brokers, controller, DEFAULT_CLUSTER_ID);
+    }
+
+    /**
+     * Creates MockAdminClient for a cluster with the given brokers.
+     * @param brokers list of brokers in the cluster
+     * @param controller node that should start as the controller
+     */
+    public MockAdminClient(List<Node> brokers, Node controller, String clusterId) {
         this.brokers = brokers;
+        controller(controller);
+        this.clusterId = clusterId;
+    }
+
+    public void controller(Node controller) {
+        if (!brokers.contains(controller))
+            throw new IllegalArgumentException("The controller node must be in the list of brokers");
+        this.controller = controller;
     }
 
     public void addTopic(boolean internal,
@@ -82,7 +110,22 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        KafkaFutureImpl<Collection<Node>> nodesFuture = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
+        KafkaFutureImpl<String> brokerIdFuture = new KafkaFutureImpl<>();
+
+        if (timeoutNextRequests > 0) {
+            nodesFuture.completeExceptionally(new TimeoutException());
+            controllerFuture.completeExceptionally(new TimeoutException());
+            brokerIdFuture.completeExceptionally(new TimeoutException());
+            --timeoutNextRequests;
+        } else {
+            nodesFuture.complete(brokers);
+            controllerFuture.complete(controller);
+            brokerIdFuture.complete(clusterId);
+        }
+
+        return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1b2f94e..98a77ed 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,8 @@ public class ConnectDistributed {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig config = new DistributedConfig(workerProps);
 
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -85,7 +88,8 @@ public class ConnectDistributed {
 
         ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
 
-        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
+        DistributedHerder herder = new DistributedHerder(config, time, worker,
+                kafkaClusterId, statusBackingStore, configBackingStore,
                 advertisedUrl.toString());
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 551c6b5..1769905 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,13 +77,15 @@ public class ConnectStandalone {
         plugins.compareAndSwapWithDelegatingLoader();
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
         Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
 
-        Herder herder = new StandaloneHerder(worker);
+        Herder herder = new StandaloneHerder(worker, kafkaClusterId);
         final Connect connect = new Connect(herder, rest);
         log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fbe0ae2..02465c9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -79,6 +79,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
     private final String workerId;
     protected final Worker worker;
+    private final String kafkaClusterId;
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
 
@@ -86,14 +87,21 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
 
     public AbstractHerder(Worker worker,
                           String workerId,
+                          String kafkaClusterId,
                           StatusBackingStore statusBackingStore,
                           ConfigBackingStore configBackingStore) {
         this.worker = worker;
         this.workerId = workerId;
+        this.kafkaClusterId = kafkaClusterId;
         this.statusBackingStore = statusBackingStore;
         this.configBackingStore = configBackingStore;
     }
 
+    @Override
+    public String kafkaClusterId() {
+        return kafkaClusterId;
+    }
+
     protected abstract int generation();
 
     protected void startServices() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5dfb808..855b08a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -176,6 +176,13 @@ public interface Herder {
      */
     Plugins plugins();
 
+
+    /**
+     * Get the cluster ID of the Kafka cluster backing this Connect cluster.
+     * @return the cluster ID of the Kafka cluster backing this connect cluster
+     */
+    String kafkaClusterId();
+
     class Created<T> {
         private final boolean created;
         private final T result;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a1cc56a..c39ee67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -149,10 +149,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
+                             String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl) {
-        this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
+        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -160,13 +161,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     DistributedHerder(DistributedConfig config,
                       Worker worker,
                       String workerId,
+                      String kafkaClusterId,
                       StatusBackingStore statusBackingStore,
                       ConfigBackingStore configBackingStore,
                       WorkerGroupMember member,
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time) {
-        super(worker, workerId, statusBackingStore, configBackingStore);
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
 
         this.time = time;
         this.herderMetrics = new HerderMetrics(metrics);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 7afb4e8..4a2f913 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -100,7 +100,7 @@ public class RestServer {
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(RootResource.class);
+        resourceConfig.register(new RootResource(herder));
         resourceConfig.register(new ConnectorsResource(herder));
         resourceConfig.register(new ConnectorPluginsResource(herder));
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
index 25ce731..a12751c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -20,12 +20,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.common.utils.AppInfoParser;
 
 public class ServerInfo {
-    private String version;
-    private String commit;
+    private final String version;
+    private final String commit;
+    private final String kafkaClusterId;
 
-    public ServerInfo() {
-        version = AppInfoParser.getVersion();
-        commit = AppInfoParser.getCommitId();
+    public ServerInfo(String kafkaClusterId) {
+        this.version = AppInfoParser.getVersion();
+        this.commit = AppInfoParser.getCommitId();
+        this.kafkaClusterId = kafkaClusterId;
     }
 
     @JsonProperty
@@ -37,4 +39,9 @@ public class ServerInfo {
     public String commit() {
         return commit;
     }
+
+    @JsonProperty("kafka_cluster_id")
+    public String clusterId() {
+        return kafkaClusterId;
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 9d94001..9666bf1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
 import javax.ws.rs.GET;
@@ -27,9 +28,15 @@ import javax.ws.rs.core.MediaType;
 @Produces(MediaType.APPLICATION_JSON)
 public class RootResource {
 
+    private final Herder herder;
+
+    public RootResource(Herder herder) {
+        this.herder = herder;
+    }
+
     @GET
     @Path("/")
     public ServerInfo serverInfo() {
-        return new ServerInfo();
+        return new ServerInfo(herder.kafkaClusterId());
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index e9ec0f9..41609cb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -52,16 +52,17 @@ public class StandaloneHerder extends AbstractHerder {
 
     private ClusterConfigState configState;
 
-    public StandaloneHerder(Worker worker) {
-        this(worker, worker.workerId(), new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
+    public StandaloneHerder(Worker worker, String kafkaClusterId) {
+        this(worker, worker.workerId(), kafkaClusterId, new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
     }
 
     // visible for testing
     StandaloneHerder(Worker worker,
                      String workerId,
+                     String kafkaClusterId,
                      StatusBackingStore statusBackingStore,
                      MemoryConfigBackingStore configBackingStore) {
-        super(worker, workerId, statusBackingStore, configBackingStore);
+        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
         this.configState = ClusterConfigState.EMPTY;
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 913ae1f..1945204 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,10 +16,20 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
+    private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+
     public static Long checkAndConvertTimestamp(Long timestamp) {
         if (timestamp == null || timestamp >= 0)
             return timestamp;
@@ -28,4 +38,28 @@ public final class ConnectUtils {
         else
             throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));
     }
+
+    public static String lookupKafkaClusterId(WorkerConfig config) {
+        try (AdminClient adminClient = AdminClient.create(config.originals())) {
+            return lookupKafkaClusterId(adminClient);
+        }
+    }
+
+    static String lookupKafkaClusterId(AdminClient adminClient) {
+        log.debug("Looking up Kafka cluster ID");
+        try {
+            KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
+            if (clusterIdFuture == null) {
+                log.info("Kafka cluster version is too old to return cluster ID");
+                return null;
+            }
+            String kafkaClusterId = clusterIdFuture.get();
+            log.info("Kafka cluster ID: {}", kafkaClusterId);
+            return kafkaClusterId;
+        } catch (InterruptedException e) {
+            throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
+        } catch (ExecutionException e) {
+            throw new ConnectException("Failed to connect to and describe Kafka cluster", e);
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ac9c312..500d31f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -51,6 +51,7 @@ import static org.junit.Assert.assertTrue;
 public class AbstractHerderTest extends EasyMockSupport {
     private final Worker worker = strictMock(Worker.class);
     private final String workerId = "workerId";
+    private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
     private final String connector = "connector";
     private final Plugins plugins = strictMock(Plugins.class);
@@ -64,8 +65,8 @@ public class AbstractHerderTest extends EasyMockSupport {
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -107,8 +108,8 @@ public class AbstractHerderTest extends EasyMockSupport {
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -228,8 +229,8 @@ public class AbstractHerderTest extends EasyMockSupport {
         StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
-                .withArgs(worker, workerId, statusStore, configStore)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
                 .addMockedMethod("generation")
                 .createMock();
         EasyMock.expect(herder.generation()).andStubReturn(generation);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d41ccbe..d7307cf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -149,6 +149,7 @@ public class DistributedHerderTest {
             TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
     private static final String WORKER_ID = "localhost:8083";
+    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
 
     @Mock private ConfigBackingStore configBackingStore;
     @Mock private StatusBackingStore statusBackingStore;
@@ -178,8 +179,10 @@ public class DistributedHerderTest {
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
 
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
-                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
+        herder = PowerMock.createPartialMock(DistributedHerder.class,
+                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
new file mode 100644
index 0000000..4e928a3
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(EasyMockRunner.class)
+public class RootResourceTest extends EasyMockSupport {
+
+    @Mock
+    private Herder herder;
+    private RootResource rootResource;
+
+    @Before
+    public void setUp() {
+        rootResource = new RootResource(herder);
+    }
+
+    @Test
+    public void testRootGet() {
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
+
+        replayAll();
+
+        ServerInfo info = rootResource.serverInfo();
+        assertEquals(AppInfoParser.getVersion(), info.version());
+        assertEquals(AppInfoParser.getCommitId(), info.commit());
+        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, info.clusterId());
+
+        verifyAll();
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 18d2739..79be45b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -89,6 +89,7 @@ public class StandaloneHerderTest {
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final int DEFAULT_MAX_TASKS = 1;
     private static final String WORKER_ID = "localhost:8083";
+    private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
 
     private enum SourceSink {
         SOURCE, SINK
@@ -110,7 +111,7 @@ public class StandaloneHerderTest {
     public void setup() {
         worker = PowerMock.createMock(Worker.class);
         herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
-            worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+            worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore());
         plugins = PowerMock.createMock(Plugins.class);
         pluginLoader = PowerMock.createMock(PluginClassLoader.class);
         delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
new file mode 100644
index 0000000..6be3525
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectUtilsTest {
+
+    @Test
+    public void testLookupKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+
+        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test
+    public void testLookupNullKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null);
+
+        assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testLookupKafkaClusterIdTimeout() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+        adminClient.timeoutNextRequest(1);
+
+        ConnectUtils.lookupKafkaClusterId(adminClient);
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index d9189d4..c50d358 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -71,7 +71,7 @@ public class InternalTopicManagerTest {
 
     @Before
     public void init() {
-        mockAdminClient = new MockAdminClient(cluster);
+        mockAdminClient = new MockAdminClient(cluster, broker1);
         internalTopicManager = new InternalTopicManager(
             mockAdminClient,
             new StreamsConfig(config));

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].