You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/05 15:52:53 UTC
[kafka] branch trunk updated: KAFKA-6311: Expose Kafka cluster ID
in Connect REST API (KIP-238) (#4314)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bfb272c KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) (#4314)
bfb272c is described below
commit bfb272c5cdb227040d862f0ab7337df68f787764
Author: Ewen Cheslack-Postava <me...@ewencp.org>
AuthorDate: Fri Jan 5 07:52:50 2018 -0800
KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) (#4314)
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/admin/MockAdminClient.java | 47 +++++++++++++++-
.../kafka/connect/cli/ConnectDistributed.java | 6 ++-
.../kafka/connect/cli/ConnectStandalone.java | 5 +-
.../kafka/connect/runtime/AbstractHerder.java | 8 +++
.../org/apache/kafka/connect/runtime/Herder.java | 7 +++
.../runtime/distributed/DistributedHerder.java | 6 ++-
.../kafka/connect/runtime/rest/RestServer.java | 2 +-
.../connect/runtime/rest/entities/ServerInfo.java | 17 ++++--
.../runtime/rest/resources/RootResource.java | 9 +++-
.../runtime/standalone/StandaloneHerder.java | 7 +--
.../apache/kafka/connect/util/ConnectUtils.java | 34 ++++++++++++
.../kafka/connect/runtime/AbstractHerderTest.java | 13 ++---
.../runtime/distributed/DistributedHerderTest.java | 7 ++-
.../runtime/rest/resources/RootResourceTest.java | 58 ++++++++++++++++++++
.../runtime/standalone/StandaloneHerderTest.java | 3 +-
.../kafka/connect/util/ConnectUtilsTest.java | 63 ++++++++++++++++++++++
.../internals/InternalTopicManagerTest.java | 2 +-
17 files changed, 268 insertions(+), 26 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index d00bad0..b6a5888 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,13 +38,41 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MockAdminClient extends AdminClient {
+ public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
+
private final List<Node> brokers;
private final Map<String, TopicMetadata> allTopics = new HashMap<>();
+ private final String clusterId;
+ private Node controller;
private int timeoutNextRequests = 0;
- public MockAdminClient(List<Node> brokers) {
+ /**
+ * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from
+ * DEFAULT_CLUSTER_ID.
+ *
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+ public MockAdminClient(List<Node> brokers, Node controller) {
+ this(brokers, controller, DEFAULT_CLUSTER_ID);
+ }
+
+ /**
+ * Creates MockAdminClient for a cluster with the given brokers.
+ * @param brokers list of brokers in the cluster
+ * @param controller node that should start as the controller
+ */
+ public MockAdminClient(List<Node> brokers, Node controller, String clusterId) {
this.brokers = brokers;
+ controller(controller);
+ this.clusterId = clusterId;
+ }
+
+ public void controller(Node controller) {
+ if (!brokers.contains(controller))
+ throw new IllegalArgumentException("The controller node must be in the list of brokers");
+ this.controller = controller;
}
public void addTopic(boolean internal,
@@ -82,7 +110,22 @@ public class MockAdminClient extends AdminClient {
@Override
public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
- throw new UnsupportedOperationException("Not implemented yet");
+ KafkaFutureImpl<Collection<Node>> nodesFuture = new KafkaFutureImpl<>();
+ KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
+ KafkaFutureImpl<String> brokerIdFuture = new KafkaFutureImpl<>();
+
+ if (timeoutNextRequests > 0) {
+ nodesFuture.completeExceptionally(new TimeoutException());
+ controllerFuture.completeExceptionally(new TimeoutException());
+ brokerIdFuture.completeExceptionally(new TimeoutException());
+ --timeoutNextRequests;
+ } else {
+ nodesFuture.complete(brokers);
+ controllerFuture.complete(controller);
+ brokerIdFuture.complete(clusterId);
+ }
+
+ return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1b2f94e..98a77ed 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +72,8 @@ public class ConnectDistributed {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
+ String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -85,7 +88,8 @@ public class ConnectDistributed {
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
- DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
+ DistributedHerder herder = new DistributedHerder(config, time, worker,
+ kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString());
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 551c6b5..1769905 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,13 +77,15 @@ public class ConnectStandalone {
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
+ String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+
RestServer rest = new RestServer(config);
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore());
- Herder herder = new StandaloneHerder(worker);
+ Herder herder = new StandaloneHerder(worker, kafkaClusterId);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index fbe0ae2..02465c9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -79,6 +79,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
private final String workerId;
protected final Worker worker;
+ private final String kafkaClusterId;
protected final StatusBackingStore statusBackingStore;
protected final ConfigBackingStore configBackingStore;
@@ -86,14 +87,21 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
public AbstractHerder(Worker worker,
String workerId,
+ String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore) {
this.worker = worker;
this.workerId = workerId;
+ this.kafkaClusterId = kafkaClusterId;
this.statusBackingStore = statusBackingStore;
this.configBackingStore = configBackingStore;
}
+ @Override
+ public String kafkaClusterId() {
+ return kafkaClusterId;
+ }
+
protected abstract int generation();
protected void startServices() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5dfb808..855b08a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -176,6 +176,13 @@ public interface Herder {
*/
Plugins plugins();
+
+ /**
+ * Get the cluster ID of the Kafka cluster backing this Connect cluster.
+ * @return the cluster ID of the Kafka cluster backing this connect cluster
+ */
+ String kafkaClusterId();
+
class Created<T> {
private final boolean created;
private final T result;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index a1cc56a..c39ee67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -149,10 +149,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
+ String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl) {
- this(config, worker, worker.workerId(), statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
+ this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(), time);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -160,13 +161,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
DistributedHerder(DistributedConfig config,
Worker worker,
String workerId,
+ String kafkaClusterId,
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
WorkerGroupMember member,
String restUrl,
ConnectMetrics metrics,
Time time) {
- super(worker, workerId, statusBackingStore, configBackingStore);
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
this.time = time;
this.herderMetrics = new HerderMetrics(metrics);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 7afb4e8..4a2f913 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -100,7 +100,7 @@ public class RestServer {
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
- resourceConfig.register(RootResource.class);
+ resourceConfig.register(new RootResource(herder));
resourceConfig.register(new ConnectorsResource(herder));
resourceConfig.register(new ConnectorPluginsResource(herder));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
index 25ce731..a12751c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -20,12 +20,14 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.common.utils.AppInfoParser;
public class ServerInfo {
- private String version;
- private String commit;
+ private final String version;
+ private final String commit;
+ private final String kafkaClusterId;
- public ServerInfo() {
- version = AppInfoParser.getVersion();
- commit = AppInfoParser.getCommitId();
+ public ServerInfo(String kafkaClusterId) {
+ this.version = AppInfoParser.getVersion();
+ this.commit = AppInfoParser.getCommitId();
+ this.kafkaClusterId = kafkaClusterId;
}
@JsonProperty
@@ -37,4 +39,9 @@ public class ServerInfo {
public String commit() {
return commit;
}
+
+ @JsonProperty("kafka_cluster_id")
+ public String clusterId() {
+ return kafkaClusterId;
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 9d94001..9666bf1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import javax.ws.rs.GET;
@@ -27,9 +28,15 @@ import javax.ws.rs.core.MediaType;
@Produces(MediaType.APPLICATION_JSON)
public class RootResource {
+ private final Herder herder;
+
+ public RootResource(Herder herder) {
+ this.herder = herder;
+ }
+
@GET
@Path("/")
public ServerInfo serverInfo() {
- return new ServerInfo();
+ return new ServerInfo(herder.kafkaClusterId());
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index e9ec0f9..41609cb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -52,16 +52,17 @@ public class StandaloneHerder extends AbstractHerder {
private ClusterConfigState configState;
- public StandaloneHerder(Worker worker) {
- this(worker, worker.workerId(), new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
+ public StandaloneHerder(Worker worker, String kafkaClusterId) {
+ this(worker, worker.workerId(), kafkaClusterId, new MemoryStatusBackingStore(), new MemoryConfigBackingStore());
}
// visible for testing
StandaloneHerder(Worker worker,
String workerId,
+ String kafkaClusterId,
StatusBackingStore statusBackingStore,
MemoryConfigBackingStore configBackingStore) {
- super(worker, workerId, statusBackingStore, configBackingStore);
+ super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore);
this.configState = ClusterConfigState.EMPTY;
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 913ae1f..1945204 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -16,10 +16,20 @@
*/
package org.apache.kafka.connect.util;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
public final class ConnectUtils {
+ private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
+
public static Long checkAndConvertTimestamp(Long timestamp) {
if (timestamp == null || timestamp >= 0)
return timestamp;
@@ -28,4 +38,28 @@ public final class ConnectUtils {
else
throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));
}
+
+ public static String lookupKafkaClusterId(WorkerConfig config) {
+ try (AdminClient adminClient = AdminClient.create(config.originals())) {
+ return lookupKafkaClusterId(adminClient);
+ }
+ }
+
+ static String lookupKafkaClusterId(AdminClient adminClient) {
+ log.debug("Looking up Kafka cluster ID");
+ try {
+ KafkaFuture<String> clusterIdFuture = adminClient.describeCluster().clusterId();
+ if (clusterIdFuture == null) {
+ log.info("Kafka cluster version is too old to return cluster ID");
+ return null;
+ }
+ String kafkaClusterId = clusterIdFuture.get();
+ log.info("Kafka cluster ID: {}", kafkaClusterId);
+ return kafkaClusterId;
+ } catch (InterruptedException e) {
+ throw new ConnectException("Unexpectedly interrupted when looking up Kafka cluster info", e);
+ } catch (ExecutionException e) {
+ throw new ConnectException("Failed to connect to and describe Kafka cluster", e);
+ }
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ac9c312..500d31f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -51,6 +51,7 @@ import static org.junit.Assert.assertTrue;
public class AbstractHerderTest extends EasyMockSupport {
private final Worker worker = strictMock(Worker.class);
private final String workerId = "workerId";
+ private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
private final String connector = "connector";
private final Plugins plugins = strictMock(Plugins.class);
@@ -64,8 +65,8 @@ public class AbstractHerderTest extends EasyMockSupport {
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
.addMockedMethod("generation")
.createMock();
@@ -107,8 +108,8 @@ public class AbstractHerderTest extends EasyMockSupport {
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
.addMockedMethod("generation")
.createMock();
@@ -228,8 +229,8 @@ public class AbstractHerderTest extends EasyMockSupport {
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
- .withArgs(worker, workerId, statusStore, configStore)
+ .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
+ .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
.addMockedMethod("generation")
.createMock();
EasyMock.expect(herder.generation()).andStubReturn(generation);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d41ccbe..d7307cf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -149,6 +149,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP, Collections.<String>emptySet());
private static final String WORKER_ID = "localhost:8083";
+ private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@Mock private ConfigBackingStore configBackingStore;
@Mock private StatusBackingStore statusBackingStore;
@@ -178,8 +179,10 @@ public class DistributedHerderTest {
worker = PowerMock.createMock(Worker.class);
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
- herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
- new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
+ herder = PowerMock.createPartialMock(DistributedHerder.class,
+ new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
+ new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
+ statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
new file mode 100644
index 0000000..4e928a3
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(EasyMockRunner.class)
+public class RootResourceTest extends EasyMockSupport {
+
+ @Mock
+ private Herder herder;
+ private RootResource rootResource;
+
+ @Before
+ public void setUp() {
+ rootResource = new RootResource(herder);
+ }
+
+ @Test
+ public void testRootGet() {
+ EasyMock.expect(herder.kafkaClusterId()).andReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
+
+ replayAll();
+
+ ServerInfo info = rootResource.serverInfo();
+ assertEquals(AppInfoParser.getVersion(), info.version());
+ assertEquals(AppInfoParser.getCommitId(), info.commit());
+ assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, info.clusterId());
+
+ verifyAll();
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 18d2739..79be45b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -89,6 +89,7 @@ public class StandaloneHerderTest {
private static final String TOPICS_LIST_STR = "topic1,topic2";
private static final int DEFAULT_MAX_TASKS = 1;
private static final String WORKER_ID = "localhost:8083";
+ private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
private enum SourceSink {
SOURCE, SINK
@@ -110,7 +111,7 @@ public class StandaloneHerderTest {
public void setup() {
worker = PowerMock.createMock(Worker.class);
herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"},
- worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
+ worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore());
plugins = PowerMock.createMock(Plugins.class);
pluginLoader = PowerMock.createMock(PluginClassLoader.class);
delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
new file mode 100644
index 0000000..6be3525
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ConnectUtilsTest {
+
+ @Test
+ public void testLookupKafkaClusterId() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+
+ assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, ConnectUtils.lookupKafkaClusterId(adminClient));
+ }
+
+ @Test
+ public void testLookupNullKafkaClusterId() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null);
+
+ assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testLookupKafkaClusterIdTimeout() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient(cluster, broker1);
+ adminClient.timeoutNextRequest(1);
+
+ ConnectUtils.lookupKafkaClusterId(adminClient);
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index d9189d4..c50d358 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -71,7 +71,7 @@ public class InternalTopicManagerTest {
@Before
public void init() {
- mockAdminClient = new MockAdminClient(cluster);
+ mockAdminClient = new MockAdminClient(cluster, broker1);
internalTopicManager = new InternalTopicManager(
mockAdminClient,
new StreamsConfig(config));
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].