You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/09 15:50:15 UTC
[kafka] branch trunk updated: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f93d5af8392 KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)
f93d5af8392 is described below
commit f93d5af8392e01f29b853cc44c6f78d74ead41be
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Feb 9 10:50:07 2023 -0500
KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)
Reviewers: Daniel Urban <du...@cloudera.com>, Greg Harris <gr...@aiven.io>, Viktor Somogyi-Vass <vi...@gmail.com>, Mickael Maison <mi...@gmail.com>
---
build.gradle | 11 +
checkstyle/import-control.xml | 3 +
checkstyle/suppressions.xml | 2 +-
.../apache/kafka/connect/mirror/MirrorMaker.java | 65 ++-
.../kafka/connect/mirror/MirrorMakerConfig.java | 41 +-
.../connect/mirror/rest/MirrorRestServer.java | 59 +++
.../rest/resources/InternalMirrorResource.java | 65 +++
.../DedicatedMirrorIntegrationTest.java | 290 +++++++++++++
.../kafka/connect/cli/AbstractConnectCli.java | 3 +-
.../kafka/connect/cli/ConnectDistributed.java | 4 +-
.../org/apache/kafka/connect/runtime/Connect.java | 5 +-
.../apache/kafka/connect/runtime/WorkerConfig.java | 229 +---------
.../runtime/distributed/DistributedConfig.java | 2 +-
.../runtime/distributed/DistributedHerder.java | 50 ++-
.../connect/runtime/rest/ConnectRestServer.java | 69 +++
.../connect/runtime/rest/HerderRequestHandler.java | 142 +++++++
.../kafka/connect/runtime/rest/RestClient.java | 7 +-
.../kafka/connect/runtime/rest/RestServer.java | 120 ++++--
.../connect/runtime/rest/RestServerConfig.java | 463 +++++++++++++++++++++
.../runtime/rest/resources/ConnectorsResource.java | 169 +-------
.../rest/resources/InternalClusterResource.java | 115 +++++
.../rest/resources/InternalConnectResource.java | 39 ++
.../kafka/connect/runtime/rest/util/SSLUtils.java | 8 +-
.../integration/RestExtensionIntegrationTest.java | 2 +-
.../integration/RestForwardingIntegrationTest.java | 17 +-
.../kafka/connect/runtime/WorkerConfigTest.java | 135 ------
.../runtime/distributed/DistributedHerderTest.java | 4 +-
.../connect/runtime/isolation/PluginsTest.java | 8 +-
...tServerTest.java => ConnectRestServerTest.java} | 156 ++++---
.../connect/runtime/rest/RestServerConfigTest.java | 167 ++++++++
.../rest/resources/ConnectorsResourceTest.java | 152 +------
.../resources/InternalConnectResourceTest.java | 224 ++++++++++
.../connect/runtime/rest/util/SSLUtilsTest.java | 38 +-
.../util/clusters/EmbeddedConnectCluster.java | 2 +-
34 files changed, 2014 insertions(+), 852 deletions(-)
diff --git a/build.gradle b/build.gradle
index 92f2fd1970d..b0b7e7f91dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2865,6 +2865,17 @@ project(':connect:mirror') {
implementation libs.argparse4j
implementation libs.jacksonAnnotations
implementation libs.slf4jApi
+ implementation libs.jacksonAnnotations
+ implementation libs.jacksonJaxrsJsonProvider
+ implementation libs.jerseyContainerServlet
+ implementation libs.jerseyHk2
+ implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9
+ implementation libs.activation // Jersey dependency that was available in the JDK before Java 9
+ implementation libs.jettyServer
+ implementation libs.jettyServlet
+ implementation libs.jettyServlets
+ implementation libs.jettyClient
+ implementation libs.swaggerAnnotations
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index aa9917dfe6e..d787d4cbb4b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -594,6 +594,9 @@
<allow pkg="org.apache.kafka.connect.integration" />
<allow pkg="org.apache.kafka.connect.mirror" />
<allow pkg="kafka.server" />
+ <subpackage name="rest">
+ <allow pkg="javax.ws.rs" />
+ </subpackage>
</subpackage>
<subpackage name="runtime">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index fefd97e90d1..74358a2b3a9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -138,7 +138,7 @@
<suppress checks="ParameterNumber"
files="Worker(SinkTask|SourceTask|Coordinator).java"/>
<suppress checks="ParameterNumber"
- files="ConfigKeyInfo.java"/>
+ files="(ConfigKeyInfo|DistributedHerder).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/>
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index ae39c6cd79a..d33d6b4f686 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.mirror.rest.MirrorRestServer;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.Worker;
@@ -26,6 +27,7 @@ import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -46,6 +48,9 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.ArgumentParsers;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,11 +106,13 @@ public class MirrorMaker {
private CountDownLatch stopLatch;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;
- private final String advertisedBaseUrl;
+ private final String advertisedUrl;
private final Time time;
private final MirrorMakerConfig config;
private final Set<String> clusters;
private final Set<SourceAndTarget> herderPairs;
+ private final MirrorRestServer internalServer;
+ private final RestClient restClient;
/**
* @param config MM2 configuration from mm2.properties file
@@ -117,7 +124,16 @@ public class MirrorMaker {
public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
log.debug("Kafka MirrorMaker instance created");
this.time = time;
- this.advertisedBaseUrl = "NOTUSED";
+ if (config.enableInternalRest()) {
+ this.restClient = new RestClient(config);
+ internalServer = new MirrorRestServer(config.originals(), restClient);
+ internalServer.initializeServer();
+ this.advertisedUrl = internalServer.advertisedUrl().toString();
+ } else {
+ internalServer = null;
+ restClient = null;
+ this.advertisedUrl = "NOTUSED";
+ }
this.config = config;
if (clusters != null && !clusters.isEmpty()) {
this.clusters = new HashSet<>(clusters);
@@ -171,6 +187,10 @@ public class MirrorMaker {
startLatch.countDown();
}
}
+ if (internalServer != null) {
+ log.info("Initializing internal REST resources");
+ internalServer.initializeInternalResources(herders);
+ }
log.info("Configuring connectors...");
herderPairs.forEach(this::configureConnectors);
log.info("Kafka MirrorMaker started");
@@ -180,6 +200,9 @@ public class MirrorMaker {
boolean wasShuttingDown = shutdown.getAndSet(true);
if (!wasShuttingDown) {
log.info("Kafka MirrorMaker stopping");
+ if (internalServer != null) {
+ Utils.closeQuietly(internalServer::stop, "Internal REST server");
+ }
for (Herder herder : herders.values()) {
try {
herder.stop();
@@ -204,11 +227,13 @@ public class MirrorMaker {
Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
herders.get(sourceAndTarget)
.putConnectorConfig(connectorClass.getSimpleName(), connectorProps, true, (e, x) -> {
- if (e instanceof NotLeaderException) {
- // No way to determine if the connector is a leader or not beforehand.
- log.info("Connector {} is a follower. Using existing configuration.", sourceAndTarget);
+ if (e == null) {
+ log.info("{} connector configured for {}.", connectorClass.getSimpleName(), sourceAndTarget);
+ } else if (e instanceof NotLeaderException) {
+ // No way to determine if the herder is a leader or not beforehand.
+ log.info("This node is a follower for {}. Using existing connector configuration.", sourceAndTarget);
} else {
- log.info("Connector {} configured.", sourceAndTarget, e);
+ log.error("Failed to configure {} connector for {}", connectorClass.getSimpleName(), sourceAndTarget, e);
}
});
}
@@ -226,7 +251,14 @@ public class MirrorMaker {
private void addHerder(SourceAndTarget sourceAndTarget) {
log.info("creating herder for " + sourceAndTarget.toString());
Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
- String advertisedUrl = advertisedBaseUrl + "/" + sourceAndTarget.source();
+ List<String> restNamespace;
+ try {
+ String encodedSource = encodePath(sourceAndTarget.source());
+ String encodedTarget = encodePath(sourceAndTarget.target());
+ restNamespace = Arrays.asList(encodedSource, encodedTarget);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Unable to create encoded URL paths for source and target using UTF-8", e);
+ }
String workerId = sourceAndTarget.toString();
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
@@ -255,13 +287,26 @@ public class MirrorMaker {
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
// tracking the various shared admin objects in this class.
- // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
Herder herder = new DistributedHerder(distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl, null, clientConfigOverridePolicy, sharedAdmin);
+ advertisedUrl, restClient, clientConfigOverridePolicy,
+ restNamespace, sharedAdmin);
herders.put(sourceAndTarget, herder);
}
+ private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+ return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+ // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+ // and pluses as '%2B'
+ // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+ // URLs
+ // So to get around that, we replace pluses in the encoded URL here with '%20',
+ // which is the encoding that Jetty expects for spaces
+ // Jetty will reverse this transformation when evaluating the path parameters
+ // and will return decoded strings with all special characters as they were.
+ .replaceAll("\\+", "%20");
+ }
+
private class ShutdownHook extends Thread {
@Override
public void run() {
@@ -300,7 +345,7 @@ public class MirrorMaker {
Properties props = Utils.loadProps(configFile.getPath());
Map<String, String> config = Utils.propsToStringMap(props);
- MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);
+ MirrorMaker mirrorMaker = new MirrorMaker(config, clusters);
try {
mirrorMaker.start();
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 19b4e4c4f77..85f4e9d79e4 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import java.util.Map;
import java.util.HashMap;
@@ -81,11 +82,14 @@ public class MirrorMakerConfig extends AbstractConfig {
static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
static final String SOURCE_PREFIX = "source.";
static final String TARGET_PREFIX = "target.";
+ static final String ENABLE_INTERNAL_REST_CONFIG = "dedicated.mode.enable.internal.rest";
+ private static final String ENABLE_INTERNAL_REST_DOC =
+ "Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly.";
private final Plugins plugins;
public MirrorMakerConfig(Map<?, ?> props) {
- super(CONFIG_DEF, props, true);
+ super(config(), props, true);
plugins = new Plugins(originalsStrings());
}
@@ -93,6 +97,10 @@ public class MirrorMakerConfig extends AbstractConfig {
return new HashSet<>(getList(CLUSTERS_CONFIG));
}
+ public boolean enableInternalRest() {
+ return getBoolean(ENABLE_INTERNAL_REST_CONFIG);
+ }
+
public List<SourceAndTarget> clusterPairs() {
List<SourceAndTarget> pairs = new ArrayList<>();
Set<String> clusters = clusters();
@@ -272,20 +280,25 @@ public class MirrorMakerConfig extends AbstractConfig {
providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
return transformed;
}
-
- protected static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
- .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
- // security support
- .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
- Type.STRING,
- CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
- in(Utils.enumOptions(SecurityProtocol.class)),
- Importance.MEDIUM,
- CommonClientConfigs.SECURITY_PROTOCOL_DOC)
- .withClientSslSupport()
- .withClientSaslSupport();
+ protected static ConfigDef config() {
+ ConfigDef result = new ConfigDef()
+ .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
+ .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
+ .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
+ // security support
+ .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ in(Utils.enumOptions(SecurityProtocol.class)),
+ Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
+ RestServerConfig.addInternalConfig(result);
+ return result;
+ }
+
private Map<String, String> stringsWithPrefixStripped(String prefix) {
return originalsStrings().entrySet().stream()
.filter(x -> x.getKey().startsWith(prefix))
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
new file mode 100644
index 00000000000..7f1fe2841a3
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.mirror.rest.resources.InternalMirrorResource;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public class MirrorRestServer extends RestServer {
+
+ private final RestClient restClient;
+ private Map<SourceAndTarget, Herder> herders;
+
+ public MirrorRestServer(Map<?, ?> props, RestClient restClient) {
+ super(RestServerConfig.forInternal(props));
+ this.restClient = restClient;
+ }
+
+ public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) {
+ this.herders = herders;
+ super.initializeResources();
+ }
+
+ @Override
+ protected Collection<ConnectResource> regularResources() {
+ return Arrays.asList(
+ new InternalMirrorResource(herders, restClient)
+ );
+ }
+
+ @Override
+ protected Collection<ConnectResource> adminResources() {
+ return Collections.emptyList();
+ }
+
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
new file mode 100644
index 00000000000..8b5150f56ac
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+ @Context
+ private UriInfo uriInfo;
+
+ private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+ private final Map<SourceAndTarget, Herder> herders;
+
+ public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+ super(restClient);
+ this.herders = herders;
+ }
+
+ @Override
+ protected Herder herderForRequest() {
+ String source = pathParam("source");
+ String target = pathParam("target");
+ Herder result = herders.get(new SourceAndTarget(source, target));
+ if (result == null) {
+ throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'");
+ }
+ return result;
+ }
+
+ private String pathParam(String name) {
+ String result = uriInfo.getPathParameters().getFirst(name);
+ if (result == null)
+ throw new NotFoundException("Could not parse " + name + " cluster from request path");
+ return result;
+ }
+
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
new file mode 100644
index 00000000000..f78de9bced6
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+ private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+ private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+ private Map<String, EmbeddedKafkaCluster> kafkaClusters;
+ private Map<String, MirrorMaker> mirrorMakers;
+
+ @BeforeEach
+ public void setup() {
+ kafkaClusters = new HashMap<>();
+ mirrorMakers = new HashMap<>();
+ }
+
+ @AfterEach
+ public void teardown() throws Throwable {
+ AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+ mirrorMakers.forEach((name, mirrorMaker) ->
+ Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+ );
+ kafkaClusters.forEach((name, kafkaCluster) ->
+ Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure)
+ );
+ if (shutdownFailure.get() != null) {
+ throw shutdownFailure.get();
+ }
+ }
+
+ private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
+ if (kafkaClusters.containsKey(name))
+ throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
+
+ EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
+ kafkaClusters.put(name, result);
+
+ result.start();
+
+ return result;
+ }
+
+ private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
+ if (mirrorMakers.containsKey(name))
+ throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
+
+ MirrorMaker result = new MirrorMaker(mmProps);
+ mirrorMakers.put(name, result);
+
+ result.start();
+
+ return result;
+ }
+
+ /**
+ * Tests a single-node cluster without the REST server enabled.
+ */
+ @Test
+ public void testSingleNodeCluster() throws Exception {
+ Properties brokerProps = new Properties();
+ EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+ EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+ clusterA.start();
+ clusterB.start();
+
+ try (Admin adminA = clusterA.createAdminClient();
+ Admin adminB = clusterB.createAdminClient()) {
+
+ // Cluster aliases
+ final String a = "A";
+ final String b = "B";
+ final String ab = a + "->" + b;
+ final String ba = b + "->" + a;
+ final String testTopicPrefix = "test-topic-";
+
+ Map<String, String> mmProps = new HashMap<String, String>() {{
+ put("dedicated.mode.enable.internal.rest", "false");
+ put("listeners", "http://localhost:0");
+ // Refresh topics very frequently to quickly pick up on topics that are created
+ // after the MM2 nodes are brought up during testing
+ put("refresh.topics.interval.seconds", "1");
+ put("clusters", String.join(", ", a, b));
+ put(a + ".bootstrap.servers", clusterA.bootstrapServers());
+ put(b + ".bootstrap.servers", clusterB.bootstrapServers());
+ put(ab + ".enabled", "true");
+ put(ab + ".topics", "^" + testTopicPrefix + ".*");
+ put(ba + ".enabled", "false");
+ put(ba + ".emit.heartbeats.enabled", "false");
+ put("replication.factor", "1");
+ put("checkpoints.topic.replication.factor", "1");
+ put("heartbeats.topic.replication.factor", "1");
+ put("offset-syncs.topic.replication.factor", "1");
+ put("offset.storage.replication.factor", "1");
+ put("status.storage.replication.factor", "1");
+ put("config.storage.replication.factor", "1");
+ }};
+
+ // Bring up a single-node cluster
+ startMirrorMaker("single node", mmProps);
+
+ final int numMessages = 10;
+ String topic = testTopicPrefix + "1";
+
+ // Create the topic on cluster A
+ createTopic(adminA, topic);
+ // and wait for MirrorMaker to create it on cluster B
+ awaitTopicCreation(b, adminB, a + "." + topic);
+
+ // Write data to the topic on cluster A
+ writeToTopic(clusterA, topic, numMessages);
+ // and wait for MirrorMaker to copy it to cluster B
+ awaitTopicContent(clusterB, b, a + "." + topic, numMessages);
+ }
+ }
+
+ /**
+ * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime
+ * and reconfigure its connectors and their tasks to replicate those topics correctly.
+ * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters">KIP-710</a>
+ * for more detail on the necessity for this test case.
+ */
+ @Test
+ public void testMultiNodeCluster() throws Exception {
+ Properties brokerProps = new Properties();
+ brokerProps.put("transaction.state.log.replication.factor", "1");
+ brokerProps.put("transaction.state.log.min.isr", "1");
+ EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+ EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+ clusterA.start();
+ clusterB.start();
+
+ try (Admin adminA = clusterA.createAdminClient();
+ Admin adminB = clusterB.createAdminClient()) {
+
+ // Cluster aliases
+ final String a = "A";
+ // Use a convoluted cluster name to ensure URL encoding/decoding works
+ final String b = "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618";
+ final String ab = a + "->" + b;
+ final String ba = b + "->" + a;
+ final String testTopicPrefix = "test-topic-";
+
+ Map<String, String> mmProps = new HashMap<String, String>() {{
+ put("dedicated.mode.enable.internal.rest", "true");
+ put("listeners", "http://localhost:0");
+ // Refresh topics very frequently to quickly pick up on topics that are created
+ // after the MM2 nodes are brought up during testing
+ put("refresh.topics.interval.seconds", "1");
+ put("clusters", String.join(", ", a, b));
+ put(a + ".bootstrap.servers", clusterA.bootstrapServers());
+ put(b + ".bootstrap.servers", clusterB.bootstrapServers());
+ // Enable exactly-once support to both validate that MirrorMaker can run with
+ // that feature turned on, and to force cross-worker communication before
+ // task startup
+ put(a + ".exactly.once.source.support", "enabled");
+ put(ab + ".enabled", "true");
+ put(ab + ".topics", "^" + testTopicPrefix + ".*");
+ // The name of the offset syncs topic will contain the name of the cluster in
+ // the replication flow that it is _not_ hosted on; create the offset syncs topic
+ // on the target cluster so that its name will contain the source cluster's name
+ // (since the target cluster's name contains characters that are not valid for
+ // use in a topic name)
+ put(ab + ".offset-syncs.topic.location", "target");
+ // Disable b -> a (and heartbeats from it) so that no topics are created that use
+ // the target cluster's name
+ put(ba + ".enabled", "false");
+ put(ba + ".emit.heartbeats.enabled", "false");
+ put("replication.factor", "1");
+ put("checkpoints.topic.replication.factor", "1");
+ put("heartbeats.topic.replication.factor", "1");
+ put("offset-syncs.topic.replication.factor", "1");
+ put("offset.storage.replication.factor", "1");
+ put("status.storage.replication.factor", "1");
+ put("config.storage.replication.factor", "1");
+ }};
+
+ // Bring up a three-node cluster
+ final int numNodes = 3;
+ for (int i = 0; i < numNodes; i++) {
+ startMirrorMaker("node " + i, mmProps);
+ }
+
+ // Create one topic per Kafka cluster per MirrorMaker node
+ final int topicsPerCluster = numNodes;
+ final int messagesPerTopic = 10;
+ for (int i = 0; i < topicsPerCluster; i++) {
+ String topic = testTopicPrefix + i;
+
+ // Create the topic on cluster A
+ createTopic(adminA, topic);
+ // and wait for MirrorMaker to create it on cluster B
+ awaitTopicCreation(b, adminB, a + "." + topic);
+
+ // Write data to the topic on cluster A
+ writeToTopic(clusterA, topic, messagesPerTopic);
+ // and wait for MirrorMaker to copy it to cluster B
+ awaitTopicContent(clusterB, b, a + "." + topic, messagesPerTopic);
+ }
+ }
+ }
+
+ private void createTopic(Admin admin, String name) throws Exception {
+ admin.createTopics(Collections.singleton(new NewTopic(name, 1, (short) 1))).all().get();
+ }
+
+ private void awaitTopicCreation(String clusterName, Admin admin, String topic) throws Exception {
+ waitForCondition(
+ () -> {
+ try {
+ Set<String> allTopics = admin.listTopics().names().get();
+ return allTopics.contains(topic);
+ } catch (Exception e) {
+ log.debug("Failed to check for existence of topic {} on cluster {}", topic, clusterName, e);
+ return false;
+ }
+ },
+ TOPIC_CREATION_TIMEOUT_MS,
+ "topic " + topic + " was not created on cluster " + clusterName + " in time"
+ );
+ }
+
+ private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMessages) {
+ for (int i = 0; i <= numMessages; i++) {
+ cluster.produce(topic, Integer.toString(i));
+ }
+ }
+
+ private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, String topic, int numMessages) throws Exception {
+ try (Consumer<?, ?> consumer = cluster.createConsumer(Collections.singletonMap(AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
+ consumer.subscribe(Collections.singleton(topic));
+ AtomicInteger messagesRead = new AtomicInteger(0);
+ waitForCondition(
+ () -> {
+ ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(1));
+ return messagesRead.addAndGet(records.count()) >= numMessages;
+ },
+ TOPIC_REPLICATION_TIMEOUT_MS,
+ () -> "could not read " + numMessages + " from topic " + topic + " on cluster " + clusterName + " in time; only read " + messagesRead.get()
+ );
+ }
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 8831081bf44..de666c7bd60 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.slf4j.Logger;
@@ -124,7 +125,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
RestClient restClient = new RestClient(config);
- RestServer restServer = new RestServer(config, restClient);
+ ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, workerProps);
restServer.initializeServer();
URI advertisedUrl = restServer.advertisedUrl();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1da9fa40622..a887a5bccd8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -97,7 +98,8 @@ public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
return new DistributedHerder(config, Time.SYSTEM, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin);
+ restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy,
+ Collections.emptyList(), sharedAdmin);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index e5ab246c0bd..e16e3bd72a1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +33,13 @@ public class Connect {
private static final Logger log = LoggerFactory.getLogger(Connect.class);
private final Herder herder;
- private final RestServer rest;
+ private final ConnectRestServer rest;
private final CountDownLatch startLatch = new CountDownLatch(1);
private final CountDownLatch stopLatch = new CountDownLatch(1);
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ShutdownHook shutdownHook;
- public Connect(Herder herder, RestServer rest) {
+ public Connect(Herder herder, ConnectRestServer rest) {
log.debug("Kafka Connect instance created");
this.herder = herder;
this.rest = rest;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 388803da946..21386fc2338 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -24,19 +24,15 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.SslClientAuth;
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -44,8 +40,6 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
-import org.eclipse.jetty.util.StringUtil;
-
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
@@ -57,9 +51,6 @@ public class WorkerConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
- private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
- Arrays.asList("set", "add", "setDate", "addDate")
- );
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOTSTRAP_SERVERS_DOC
@@ -119,47 +110,6 @@ public class WorkerConfig extends AbstractConfig {
+ "running with exactly-once support.";
public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
- public static final String LISTENERS_CONFIG = "listeners";
- private static final String LISTENERS_DOC
- = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
- " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
- " Leave hostname empty to bind to default interface.\n" +
- " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
- static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083");
-
- public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
- private static final String REST_ADVERTISED_HOST_NAME_DOC
- = "If this is set, this is the hostname that will be given out to other workers to connect to.";
-
- public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
- private static final String REST_ADVERTISED_PORT_DOC
- = "If this is set, this is the port that will be given out to other workers to connect to.";
-
- public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener";
- private static final String REST_ADVERTISED_LISTENER_DOC
- = "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use.";
-
- public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
- protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
- "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
- "To enable cross origin access, set this to the domain of the application that should be permitted" +
- " to access the API, or '*' to allow access from any domain. The default value only allows access" +
- " from the domain of the REST API.";
- protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
-
- public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods";
- protected static final String ACCESS_CONTROL_ALLOW_METHODS_DOC =
- "Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. "
- + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
- protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
-
- public static final String ADMIN_LISTENERS_CONFIG = "admin.listeners";
- protected static final String ADMIN_LISTENERS_DOC = "List of comma-separated URIs the Admin REST API will listen on." +
- " The supported protocols are HTTP and HTTPS." +
- " An empty or blank string will disable this feature." +
- " The default behavior is to use the regular listener (specified by the 'listeners' property).";
- public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https.";
-
public static final String PLUGIN_PATH_CONFIG = "plugin.path";
protected static final String PLUGIN_PATH_DOC = "List of paths separated by commas (,) that "
+ "contain plugins (connectors, converters, transformations). The list should consist"
@@ -182,13 +132,6 @@ public class WorkerConfig extends AbstractConfig {
+ "<code>ConfigProvider</code> allows you to replace variable references in connector configurations, "
+ "such as for externalized secrets. ";
- public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
- protected static final String REST_EXTENSION_CLASSES_DOC =
- "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
- + "in the order specified. Implementing the interface "
- + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
- + "Typically used to add custom capability like logging, security, etc. ";
-
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
"Class name or alias of implementation of <code>ConnectorClientConfigOverridePolicy</code>. Defines what client configurations can be "
@@ -227,17 +170,13 @@ public class WorkerConfig extends AbstractConfig {
+ "to create topics automatically.";
protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true;
- public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
- protected static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
- protected static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
-
/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
* @return a ConfigDef with all the common options specified
*/
protected static ConfigDef baseConfigDef() {
- return new ConfigDef()
+ ConfigDef result = new ConfigDef()
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -258,16 +197,6 @@ public class WorkerConfig extends AbstractConfig {
Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
.define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
- .define(LISTENERS_CONFIG, Type.LIST, LISTENERS_DEFAULT, new ListenersValidator(), Importance.LOW, LISTENERS_DOC)
- .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
- .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
- .define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
- .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
- ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
- ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
- .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
- ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
- ACCESS_CONTROL_ALLOW_METHODS_DOC)
.define(PLUGIN_PATH_CONFIG,
Type.LIST,
null,
@@ -292,30 +221,37 @@ public class WorkerConfig extends AbstractConfig {
true,
Importance.LOW,
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
- .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
- ConfigDef.Type.STRING, SslClientAuth.NONE.toString(), in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
HEADER_CONVERTER_CLASS_DEFAULT,
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
.define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
Collections.emptyList(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
- .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
- Importance.LOW, REST_EXTENSION_CLASSES_DOC)
- .define(ADMIN_LISTENERS_CONFIG, Type.LIST, null,
- new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC)
.define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
- .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
- Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
- .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
- Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC)
.define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW,
TOPIC_CREATION_ENABLE_DOC)
- .define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT,
- new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC)
// security support
.withClientSslSupport();
+ addTopicTrackingConfig(result);
+ RestServerConfig.addPublicConfig(result);
+ return result;
+ }
+
+ public static void addTopicTrackingConfig(ConfigDef configDef) {
+ configDef
+ .define(
+ TOPIC_TRACKING_ENABLE_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ TOPIC_TRACKING_ENABLE_DEFAULT,
+ ConfigDef.Importance.LOW,
+ TOPIC_TRACKING_ENABLE_DOC
+ ).define(
+ TOPIC_TRACKING_ALLOW_RESET_CONFIG,
+ ConfigDef.Type.BOOLEAN,
+ TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
+ ConfigDef.Importance.LOW,
+ TOPIC_TRACKING_ALLOW_RESET_DOC);
}
private String kafkaClusterId;
@@ -396,7 +332,7 @@ public class WorkerConfig extends AbstractConfig {
return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
}
- public Integer getRebalanceTimeout() {
+ public Integer rebalanceTimeout() {
return null;
}
@@ -477,125 +413,4 @@ public class WorkerConfig extends AbstractConfig {
logPluginPathConfigProviderWarning(props);
}
- // Visible for testing
- static void validateHttpResponseHeaderConfig(String config) {
- try {
- // validate format
- String[] configTokens = config.trim().split("\\s+", 2);
- if (configTokens.length != 2) {
- throw new ConfigException(String.format("Invalid format of header config '%s'. "
- + "Expected: '[action] [header name]:[header value]'", config));
- }
-
- // validate action
- String method = configTokens[0].trim();
- validateHeaderConfigAction(method);
-
- // validate header name and header value pair
- String header = configTokens[1];
- String[] headerTokens = header.trim().split(":");
- if (headerTokens.length != 2) {
- throw new ConfigException(
- String.format("Invalid format of header name and header value pair '%s'. "
- + "Expected: '[header name]:[header value]'", header));
- }
-
- // validate header name
- String headerName = headerTokens[0].trim();
- if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
- throw new ConfigException(String.format("Invalid header name '%s'. "
- + "The '[header name]' cannot contain whitespace", headerName));
- }
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
- }
- }
-
- // Visible for testing
- static void validateHeaderConfigAction(String action) {
- if (HEADER_ACTIONS.stream().noneMatch(action::equalsIgnoreCase)) {
- throw new ConfigException(String.format("Invalid header config action: '%s'. "
- + "Expected one of %s", action, HEADER_ACTIONS));
- }
- }
-
- private static class ListenersValidator implements ConfigDef.Validator {
- @Override
- public void ensureValid(String name, Object value) {
- if (!(value instanceof List)) {
- throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443).");
- }
-
- List<?> items = (List<?>) value;
- if (items.isEmpty()) {
- throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443.");
- }
-
- for (Object item : items) {
- if (!(item instanceof String)) {
- throw new ConfigException("Invalid type for listeners (expected String).");
- }
- if (Utils.isBlank((String) item)) {
- throw new ConfigException("Empty URL found when parsing listeners list.");
- }
- }
- }
-
- @Override
- public String toString() {
- return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
- }
- }
-
- private static class AdminListenersValidator implements ConfigDef.Validator {
- @Override
- public void ensureValid(String name, Object value) {
- if (value == null) {
- return;
- }
-
- if (!(value instanceof List)) {
- throw new ConfigException("Invalid value type for admin.listeners (expected list).");
- }
-
- List<?> items = (List<?>) value;
- if (items.isEmpty()) {
- return;
- }
-
- for (Object item : items) {
- if (!(item instanceof String)) {
- throw new ConfigException("Invalid type for admin.listeners (expected String).");
- }
- if (Utils.isBlank((String) item)) {
- throw new ConfigException("Empty URL found when parsing admin.listeners list.");
- }
- }
- }
-
- @Override
- public String toString() {
- return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
- }
- }
-
- private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
- @Override
- public void ensureValid(String name, Object value) {
- String strValue = (String) value;
- if (Utils.isBlank(strValue)) {
- return;
- }
-
- String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
- Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig);
- }
-
- @Override
- public String toString() {
- return "Comma-separated header rules, where each header rule is of the form "
- + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
- + "if any part of a header rule contains a comma";
- }
- }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 1acdae79f80..9cacb6e1ee6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -509,7 +509,7 @@ public class DistributedConfig extends WorkerConfig {
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
@Override
- public Integer getRebalanceTimeout() {
+ public Integer rebalanceTimeout() {
return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 6f7a372f413..b68cc443ea7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -199,6 +199,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private Set<String> connectorTargetStateChanges = new HashSet<>();
// Access to this map is protected by the herder's monitor
private final Map<String, ZombieFencing> activeZombieFencings = new HashMap<>();
+ private final List<String> restNamespace;
private boolean needsReconfigRebalance;
private volatile boolean fencedFromConfigTopic;
private volatile int generation;
@@ -228,9 +229,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
* @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null
* @param statusBackingStore the backing store for statuses; may not be null
* @param configBackingStore the backing store for connector configurations; may not be null
- * @param restUrl the URL of this herder's REST API; may not be null
+ * @param restUrl the URL of this herder's REST API; may not be null, but may be an arbitrary placeholder
+ * value if this worker does not expose a REST API
+ * @param restClient a REST client that can be used to issue requests to other workers in the cluster; may
+ * be null if inter-worker communication is not enabled
* @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
* in connector configurations; may not be null
+ * @param restNamespace zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null
* @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
* after all services and resources owned by this herder are stopped
*/
@@ -243,10 +248,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
String restUrl,
RestClient restClient,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ List<String> restNamespace,
AutoCloseable... uponShutdown) {
- this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore,
- null, restUrl, restClient, worker.metrics(),
- time, connectorClientConfigOverridePolicy, uponShutdown);
+ this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, restClient, worker.metrics(),
+ time, connectorClientConfigOverridePolicy, restNamespace, uponShutdown);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -263,6 +268,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
ConnectMetrics metrics,
Time time,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+ List<String> restNamespace,
AutoCloseable... uponShutdown) {
super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
@@ -277,6 +283,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.keyGenerator = config.getInternalRequestKeyGenerator();
this.restClient = restClient;
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+ this.restNamespace = Objects.requireNonNull(restNamespace);
this.uponShutdown = Arrays.asList(uponShutdown);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
@@ -1162,11 +1169,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
callback.onCompletion(null, null);
} else if (error instanceof NotLeaderException) {
if (restClient != null) {
- String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
- log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
+ String workerUrl = ((NotLeaderException) error).forwardUrl();
+ String fenceUrl = namespacedUrl(workerUrl)
+ .path("connectors")
+ .path(id.connector())
+ .path("fence")
+ .build()
+ .toString();
+ log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), fenceUrl);
forwardRequestExecutor.execute(() -> {
try {
- restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
+ restClient.httpRequest(fenceUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
callback.onCompletion(null, null);
} catch (Throwable t) {
callback.onCompletion(t, null);
@@ -1175,12 +1188,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
} else {
callback.onCompletion(
new ConnectException(
- // TODO: Update this message if KIP-710 is accepted and merged
- // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
"This worker is not able to communicate with the leader of the cluster, "
+ "which is required for exactly-once source tasks. If running MirrorMaker 2 "
- + "in dedicated mode, consider either disabling exactly-once support, or deploying "
- + "the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster."
+ + "in dedicated mode, consider enabling inter-worker communication via the "
+ + "'dedicated.mode.enable.internal.rest' property."
),
null
);
@@ -1936,12 +1947,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
cb.onCompletion(null, null);
} else if (restClient == null) {
- // TODO: Update this message if KIP-710 is accepted and merged
- // (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
+ "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
- + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a "
- + "distributed Kafka Connect cluster.",
+ + "in dedicated mode, consider enabling inter-worker communication via the "
+ + "'dedicated.mode.enable.internal.rest' property.",
leaderUrl()
);
} else {
@@ -1956,7 +1965,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
"because the URL of the leader's REST interface is empty!"), null);
return;
}
- String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+ String reconfigUrl = namespacedUrl(leaderUrl)
.path("connectors")
.path(connName)
.path("tasks")
@@ -1965,6 +1974,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.trace("Forwarding task configurations for connector {} to leader", connName);
restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
cb.onCompletion(null, null);
+ log.trace("Request to leader to reconfigure connector tasks succeeded");
} catch (ConnectException e) {
log.error("Request to leader to reconfigure connector tasks failed", e);
cb.onCompletion(e, null);
@@ -2460,6 +2470,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return false;
}
+ private UriBuilder namespacedUrl(String workerUrl) {
+ UriBuilder result = UriBuilder.fromUri(workerUrl);
+ for (String namespacePath : restNamespace) {
+ result = result.path(namespacePath);
+ }
+ return result;
+ }
+
/**
* Represents an active zombie fencing: that is, an in-progress attempt to invoke
* {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
new file mode 100644
index 00000000000..ca9eb731c0c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
+import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
+import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.glassfish.jersey.server.ResourceConfig;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+public class ConnectRestServer extends RestServer {
+
+ private final RestClient restClient;
+ private Herder herder;
+
+ public ConnectRestServer(Integer rebalanceTimeoutMs, RestClient restClient, Map<?, ?> props) {
+ super(RestServerConfig.forPublic(rebalanceTimeoutMs, props));
+ this.restClient = restClient;
+ }
+
+ public void initializeResources(Herder herder) {
+ this.herder = herder;
+ super.initializeResources();
+ }
+
+ @Override
+ protected Collection<ConnectResource> regularResources() {
+ return Arrays.asList(
+ new RootResource(herder),
+ new ConnectorsResource(herder, config, restClient),
+ new InternalConnectResource(herder, restClient),
+ new ConnectorPluginsResource(herder)
+ );
+ }
+
+ @Override
+ protected Collection<ConnectResource> adminResources() {
+ return Arrays.asList(
+ new LoggingResource()
+ );
+ }
+
+ @Override
+ protected void configureRegularResources(ResourceConfig resourceConfig) {
+ registerRestExtensions(herder, resourceConfig);
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
new file mode 100644
index 00000000000..1f71070047c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class HerderRequestHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(HerderRequestHandler.class);
+
+ private final RestClient restClient;
+
+ private long requestTimeoutMs;
+
+ public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
+ this.restClient = restClient;
+ this.requestTimeoutMs = requestTimeoutMs;
+ }
+
+ public void requestTimeoutMs(long requestTimeoutMs) {
+ if (requestTimeoutMs < 1) {
+ throw new IllegalArgumentException("REST request timeout must be positive");
+ }
+ this.requestTimeoutMs = requestTimeoutMs;
+ }
+
+ /**
+ * Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
+ * request to the leader.
+ */
+ public <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
+ String path,
+ String method,
+ HttpHeaders headers,
+ Map<String, String> queryParameters,
+ Object body,
+ TypeReference<U> resultType,
+ Translator<T, U> translator,
+ Boolean forward) throws Throwable {
+ try {
+ return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+
+ if (cause instanceof RequestTargetException) {
+ if (forward == null || forward) {
+ // the only time we allow recursive forwarding is when no forward flag has
+ // been set, which should only be seen by the first worker to handle a user request.
+ // this gives two total hops to resolve the request before giving up.
+ boolean recursiveForward = forward == null;
+ RequestTargetException targetException = (RequestTargetException) cause;
+ String forwardedUrl = targetException.forwardUrl();
+ if (forwardedUrl == null) {
+ // the target didn't know of the leader at this moment.
+ throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+ "Cannot complete request momentarily due to no known leader URL, "
+ + "likely because a rebalance was underway.");
+ }
+ UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl)
+ .path(path)
+ .queryParam("forward", recursiveForward);
+ if (queryParameters != null) {
+ queryParameters.forEach(uriBuilder::queryParam);
+ }
+ String forwardUrl = uriBuilder.build().toString();
+ log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
+ return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType));
+ } else {
+ // we should find the right target for the query within two hops, so if
+ // we don't, it probably means that a rebalance has taken place.
+ throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+ "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
+ }
+ } else if (cause instanceof RebalanceNeededException) {
+ throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+ "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
+ }
+
+ throw cause;
+ } catch (TimeoutException e) {
+ // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
+ // error is the best option
+ throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
+ } catch (InterruptedException e) {
+ throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
+ }
+ }
+
+ public <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
+ TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
+ return completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
+ }
+
+ public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
+ TypeReference<T> resultType, Boolean forward) throws Throwable {
+ return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<>(), forward);
+ }
+
+ public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
+ Object body, Boolean forward) throws Throwable {
+ return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<>(), forward);
+ }
+
+ public interface Translator<T, U> {
+ T translate(RestClient.HttpResponse<U> response);
+ }
+
+ public static class IdentityTranslator<T> implements Translator<T, T> {
+ @Override
+ public T translate(RestClient.HttpResponse<T> response) {
+ return response.body();
+ }
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 16ea02ebbdd..d9ba6194124 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,8 +19,8 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.Crypto;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@@ -51,9 +51,10 @@ import java.util.concurrent.TimeoutException;
public class RestClient {
private static final Logger log = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
- private WorkerConfig config;
- public RestClient(WorkerConfig config) {
+ private final AbstractConfig config;
+
+ public RestClient(AbstractConfig config) {
this.config = config;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 280c3609ba7..e74000ff137 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -24,15 +24,10 @@ import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
-import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
-import org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
@@ -67,12 +62,10 @@ import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX;
-
/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
-public class RestServer {
+public abstract class RestServer {
private static final Logger log = LoggerFactory.getLogger(RestServer.class);
// Used to distinguish between Admin connectors and regular REST API connectors when binding admin handlers
@@ -84,9 +77,7 @@ public class RestServer {
private static final String PROTOCOL_HTTP = "http";
private static final String PROTOCOL_HTTPS = "https";
- private final WorkerConfig config;
-
- private final RestClient restClient;
+ protected final RestServerConfig config;
private final ContextHandlerCollection handlers;
private final Server jettyServer;
@@ -96,12 +87,11 @@ public class RestServer {
/**
* Create a REST server for this herder using the specified configs.
*/
- public RestServer(WorkerConfig config, RestClient restClient) {
+ protected RestServer(RestServerConfig config) {
this.config = config;
- this.restClient = restClient;
- List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
- List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+ List<String> listeners = config.listeners();
+ List<String> adminListeners = config.adminListeners();
jettyServer = new Server();
handlers = new ContextHandlerCollection();
@@ -161,7 +151,7 @@ public class RestServer {
if (PROTOCOL_HTTPS.equals(protocol)) {
SslContextFactory ssl;
if (isAdmin) {
- ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
+ ssl = SSLUtils.createServerSideSslContextFactory(config, RestServerConfig.ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
} else {
ssl = SSLUtils.createServerSideSslContextFactory(config);
}
@@ -210,44 +200,47 @@ public class RestServer {
}
log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
- log.info("REST admin endpoints at " + adminUrl());
+ URI adminUrl = adminUrl();
+ if (adminUrl != null)
+ log.info("REST admin endpoints at " + adminUrl);
}
- public void initializeResources(Herder herder) {
+ protected final void initializeResources() {
log.info("Initializing REST resources");
+ resources = new ArrayList<>();
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
- this.resources = new ArrayList<>();
- resources.add(new RootResource(herder));
- resources.add(new ConnectorsResource(herder, config, restClient));
- resources.add(new ConnectorPluginsResource(herder));
- resources.forEach(resourceConfig::register);
+ Collection<ConnectResource> regularResources = regularResources();
+ regularResources.forEach(resourceConfig::register);
+ resources.addAll(regularResources);
resourceConfig.register(ConnectExceptionMapper.class);
resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
- registerRestExtensions(herder, resourceConfig);
+ configureRegularResources(resourceConfig);
- List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+ List<String> adminListeners = config.adminListeners();
ResourceConfig adminResourceConfig;
if (adminListeners == null) {
log.info("Adding admin resources to main listener");
adminResourceConfig = resourceConfig;
- LoggingResource loggingResource = new LoggingResource();
- this.resources.add(loggingResource);
- adminResourceConfig.register(loggingResource);
+ Collection<ConnectResource> adminResources = adminResources();
+ resources.addAll(adminResources);
+ adminResources.forEach(adminResourceConfig::register);
+ configureAdminResources(adminResourceConfig);
} else if (adminListeners.size() > 0) {
// TODO: we need to check if these listeners are same as 'listeners'
// TODO: the following code assumes that they are different
log.info("Adding admin resources to admin listener");
adminResourceConfig = new ResourceConfig();
adminResourceConfig.register(new JacksonJsonProvider());
- LoggingResource loggingResource = new LoggingResource();
- this.resources.add(loggingResource);
- adminResourceConfig.register(loggingResource);
+ Collection<ConnectResource> adminResources = adminResources();
+ resources.addAll(adminResources);
+ adminResources.forEach(adminResourceConfig::register);
adminResourceConfig.register(ConnectExceptionMapper.class);
+ configureAdminResources(adminResourceConfig);
} else {
log.info("Skipping adding admin resources");
// set up adminResource but add no handlers to it
@@ -273,21 +266,21 @@ public class RestServer {
contextHandlers.add(adminContext);
}
- String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+ String allowedOrigins = config.allowedOrigins();
if (!Utils.isBlank(allowedOrigins)) {
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
filterHolder.setName("cross-origin");
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
- String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
+ String allowedMethods = config.allowedMethods();
if (!Utils.isBlank(allowedMethods)) {
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
}
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}
- String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+ String headerConfig = config.responseHeaders();
if (!Utils.isBlank(headerConfig)) {
- configureHttpResponseHeaderFilter(context);
+ configureHttpResponseHeaderFilter(context, headerConfig);
}
handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
@@ -309,6 +302,44 @@ public class RestServer {
log.info("REST resources initialized; server is started and ready to handle requests");
}
+ /**
+ * @return the {@link ConnectResource resources} that should be registered with the
+ * standard (i.e., non-admin) listener for this server; may be empty, but not null
+ */
+ protected abstract Collection<ConnectResource> regularResources();
+
+ /**
+ * @return the {@link ConnectResource resources} that should be registered with the
+ * admin listener for this server; may be empty, but not null
+ */
+ protected abstract Collection<ConnectResource> adminResources();
+
+ /**
+ * Pluggable hook to customize the regular (i.e., non-admin) resources on this server
+ * after they have been instantiated and registered with the given {@link ResourceConfig}.
+ * This may be used to, for example, add REST extensions via {@link #registerRestExtensions(Herder, ResourceConfig)}.
+ * <p>
+ * <em>N.B.: Classes do <b>not</b> need to register the resources provided in {@link #regularResources()} with
+ * the {@link ResourceConfig} parameter in this method; they are automatically registered by the parent class.</em>
+ * @param resourceConfig the {@link ResourceConfig} that the server's regular listeners are registered with; never null
+ */
+ protected void configureRegularResources(ResourceConfig resourceConfig) {
+ // No-op by default
+ }
+
+ /**
+ * Pluggable hook to customize the admin resources on this server after they have been instantiated and registered
+ * with the given {@link ResourceConfig}. This may be used to, for example, add REST extensions via
+ * {@link #registerRestExtensions(Herder, ResourceConfig)}.
+ * <p>
+ * <em>N.B.: Classes do <b>not</b> need to register the resources provided in {@link #adminResources()} with
+ * the {@link ResourceConfig} parameter in this method; they are automatically registered by the parent class.</em>
+ * @param adminResourceConfig the {@link ResourceConfig} that the server's regular listeners are registered with; never null
+ */
+ protected void configureAdminResources(ResourceConfig adminResourceConfig) {
+ // No-op by default
+ }
+
public URI serverUrl() {
return jettyServer.getURI();
}
@@ -346,13 +377,13 @@ public class RestServer {
ServerConnector serverConnector = findConnector(advertisedSecurityProtocol);
builder.scheme(advertisedSecurityProtocol);
- String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
+ String advertisedHostname = config.advertisedHostName();
if (advertisedHostname != null && !advertisedHostname.isEmpty())
builder.host(advertisedHostname);
else if (serverConnector != null && serverConnector.getHost() != null && serverConnector.getHost().length() > 0)
builder.host(serverConnector.getHost());
- Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
+ Integer advertisedPort = config.advertisedPort();
if (advertisedPort != null)
builder.port(advertisedPort);
else if (serverConnector != null && serverConnector.getPort() > 0)
@@ -376,7 +407,7 @@ public class RestServer {
}
if (adminConnector == null) {
- List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+ List<String> adminListeners = config.adminListeners();
if (adminListeners == null) {
return advertisedUrl();
} else if (adminListeners.isEmpty()) {
@@ -399,9 +430,9 @@ public class RestServer {
}
String determineAdvertisedProtocol() {
- String advertisedSecurityProtocol = config.getString(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+ String advertisedSecurityProtocol = config.advertisedListener();
if (advertisedSecurityProtocol == null) {
- String listeners = (String) config.originals().get(WorkerConfig.LISTENERS_CONFIG);
+ String listeners = config.rawListeners();
if (listeners == null)
return PROTOCOL_HTTP;
@@ -440,14 +471,14 @@ public class RestServer {
return null;
}
- void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+ protected final void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
connectRestExtensions = herder.plugins().newPlugins(
- config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+ config.restExtensions(),
config, ConnectRestExtension.class);
long herderRequestTimeoutMs = ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
- Integer rebalanceTimeoutMs = config.getRebalanceTimeout();
+ Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs();
if (rebalanceTimeoutMs != null) {
herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
@@ -472,8 +503,7 @@ public class RestServer {
* Register header filter to ServletContextHandler.
* @param context The servlet context handler
*/
- protected void configureHttpResponseHeaderFilter(ServletContextHandler context) {
- String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+ protected void configureHttpResponseHeaderFilter(ServletContextHandler context, String headerConfig) {
FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class);
headerFilterHolder.setInitParameter("headerConfig", headerConfig);
context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
new file mode 100644
index 00000000000..0d6d06a4a59
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.eclipse.jetty.util.StringUtil;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * Defines the configuration surface for a {@link RestServer} instance, with support for both
+ * {@link #forInternal(Map) internal-only} and {@link #forPublic(Integer, Map) user-facing}
+ * servers. An internal-only server will expose only the endpoints and listeners necessary for
+ * intra-cluster communication; these include the task-write and zombie-fencing endpoints. A
+ * user-facing server will expose these endpoints and, in addition, all endpoints that are part of
+ * the public REST API for Kafka Connect; these include the connector creation, connector
+ * status, configuration validation, and logging endpoints. In addition, a user-facing server will
+ * instantiate any user-configured
+ * {@link RestServerConfig#REST_EXTENSION_CLASSES_CONFIG REST extensions}.
+ */
+public abstract class RestServerConfig extends AbstractConfig {
+
+ public static final String LISTENERS_CONFIG = "listeners";
+ private static final String LISTENERS_DOC
+ = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
+ " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+ " Leave hostname empty to bind to default interface.\n" +
+ " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
+ // Visible for testing
+ static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083");
+
+ public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
+ private static final String REST_ADVERTISED_HOST_NAME_DOC
+ = "If this is set, this is the hostname that will be given out to other workers to connect to.";
+
+ public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
+ private static final String REST_ADVERTISED_PORT_DOC
+ = "If this is set, this is the port that will be given out to other workers to connect to.";
+
+ public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener";
+ private static final String REST_ADVERTISED_LISTENER_DOC
+ = "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use.";
+
+ public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
+ private static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
+ "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
+ "To enable cross origin access, set this to the domain of the application that should be permitted" +
+ " to access the API, or '*' to allow access from any domain. The default value only allows access" +
+ " from the domain of the REST API.";
+ protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
+
+ public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods";
+ private static final String ACCESS_CONTROL_ALLOW_METHODS_DOC =
+ "Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. "
+ + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
+ private static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
+
+ public static final String ADMIN_LISTENERS_CONFIG = "admin.listeners";
+ private static final String ADMIN_LISTENERS_DOC = "List of comma-separated URIs the Admin REST API will listen on." +
+ " The supported protocols are HTTP and HTTPS." +
+ " An empty or blank string will disable this feature." +
+ " The default behavior is to use the regular listener (specified by the 'listeners' property).";
+ public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https.";
+
+ public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
+ private static final String REST_EXTENSION_CLASSES_DOC =
+ "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+ + "in the order specified. Implementing the interface "
+ + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+ + "Typically used to add custom capability like logging, security, etc. ";
+
+ // Visible for testing
+ static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
+ // Visible for testing
+ static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
+ // Visible for testing
+ static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
+ private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
+ Arrays.asList("set", "add", "setDate", "addDate")
+ );
+
+
+ /**
+ * @return the listeners to use for this server, or empty if no admin endpoints should be exposed,
+ * or null if the admin endpoints should be exposed on the {@link #listeners() regular listeners} for
+ * this server
+ */
+ public abstract List<String> adminListeners();
+
+ /**
+ * @return a list of {@link #REST_EXTENSION_CLASSES_CONFIG REST extension} classes
+ * to instantiate and use with the server
+ */
+ public abstract List<String> restExtensions();
+
+ /**
+ * @return whether {@link WorkerConfig#TOPIC_TRACKING_ENABLE_CONFIG topic tracking}
+ * is enabled on this worker
+ */
+ public abstract boolean topicTrackingEnabled();
+
+ /**
+ * @return whether {@link WorkerConfig#TOPIC_TRACKING_ALLOW_RESET_CONFIG topic tracking resets}
+ * are enabled on this worker
+ */
+ public abstract boolean topicTrackingResetEnabled();
+
+ /**
+ * Add the properties related to a user-facing server to the given {@link ConfigDef}.
+ * </p>
+ * This automatically adds the properties for intra-cluster communication; it is not necessary to
+ * invoke both {@link #addInternalConfig(ConfigDef)} and this method on the same {@link ConfigDef}.
+ * @param configDef the {@link ConfigDef} to add the properties to; may not be null
+ */
+ public static void addPublicConfig(ConfigDef configDef) {
+ addInternalConfig(configDef);
+ configDef
+ .define(
+ REST_EXTENSION_CLASSES_CONFIG,
+ ConfigDef.Type.LIST,
+ "",
+ ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC
+ ).define(ADMIN_LISTENERS_CONFIG,
+ ConfigDef.Type.LIST,
+ null,
+ new AdminListenersValidator(),
+ ConfigDef.Importance.LOW,
+ ADMIN_LISTENERS_DOC);
+ }
+
+ /**
+ * Add the properties related to an internal-only server to the given {@link ConfigDef}.
+ * @param configDef the {@link ConfigDef} to add the properties to; may not be null
+ */
+ public static void addInternalConfig(ConfigDef configDef) {
+ configDef
+ .define(
+ LISTENERS_CONFIG,
+ ConfigDef.Type.LIST,
+ LISTENERS_DEFAULT,
+ new ListenersValidator(),
+ ConfigDef.Importance.LOW,
+ LISTENERS_DOC
+ ).define(
+ REST_ADVERTISED_HOST_NAME_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.LOW,
+ REST_ADVERTISED_HOST_NAME_DOC
+ ).define(
+ REST_ADVERTISED_PORT_CONFIG,
+ ConfigDef.Type.INT,
+ null,
+ ConfigDef.Importance.LOW,
+ REST_ADVERTISED_PORT_DOC
+ ).define(
+ REST_ADVERTISED_LISTENER_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ ConfigDef.Importance.LOW,
+ REST_ADVERTISED_LISTENER_DOC
+ ).define(
+ ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG,
+ ConfigDef.Type.STRING,
+ ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT,
+ ConfigDef.Importance.LOW,
+ ACCESS_CONTROL_ALLOW_ORIGIN_DOC
+ ).define(
+ ACCESS_CONTROL_ALLOW_METHODS_CONFIG,
+ ConfigDef.Type.STRING,
+ ACCESS_CONTROL_ALLOW_METHODS_DEFAULT,
+ ConfigDef.Importance.LOW,
+ ACCESS_CONTROL_ALLOW_METHODS_DOC
+ ).define(
+ RESPONSE_HTTP_HEADERS_CONFIG,
+ ConfigDef.Type.STRING,
+ RESPONSE_HTTP_HEADERS_DEFAULT,
+ new ResponseHttpHeadersValidator(),
+ ConfigDef.Importance.LOW,
+ RESPONSE_HTTP_HEADERS_DOC
+ ).define(
+ BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
+ ConfigDef.Type.STRING,
+ SslClientAuth.NONE.toString(),
+ in(Utils.enumOptions(SslClientAuth.class)),
+ ConfigDef.Importance.LOW,
+ BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
+ }
+
+ public static RestServerConfig forPublic(Integer rebalanceTimeoutMs, Map<?, ?> props) {
+ return new PublicConfig(rebalanceTimeoutMs, props);
+ }
+
+ public static RestServerConfig forInternal(Map<?, ?> props) {
+ return new InternalConfig(props);
+ }
+
+ public List<String> listeners() {
+ return getList(LISTENERS_CONFIG);
+ }
+
+ public String rawListeners() {
+ return (String) originals().get(LISTENERS_CONFIG);
+ }
+
+ public String allowedOrigins() {
+ return getString(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+ }
+
+ public String allowedMethods() {
+ return getString(ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
+ }
+
+ public String responseHeaders() {
+ return getString(RESPONSE_HTTP_HEADERS_CONFIG);
+ }
+
+ public String advertisedListener() {
+ return getString(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+ }
+
+ public String advertisedHostName() {
+ return getString(REST_ADVERTISED_HOST_NAME_CONFIG);
+ }
+
+ public Integer advertisedPort() {
+ return getInt(REST_ADVERTISED_PORT_CONFIG);
+ }
+
+ public Integer rebalanceTimeoutMs() {
+ return null;
+ }
+
+ protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) {
+ super(configDef, props);
+ }
+
+ // Visible for testing
+ static void validateHttpResponseHeaderConfig(String config) {
+ try {
+ // validate format
+ String[] configTokens = config.trim().split("\\s+", 2);
+ if (configTokens.length != 2) {
+ throw new ConfigException(String.format("Invalid format of header config '%s'. "
+ + "Expected: '[action] [header name]:[header value]'", config));
+ }
+
+ // validate action
+ String method = configTokens[0].trim();
+ validateHeaderConfigAction(method);
+
+ // validate header name and header value pair
+ String header = configTokens[1];
+ String[] headerTokens = header.trim().split(":");
+ if (headerTokens.length != 2) {
+ throw new ConfigException(
+ String.format("Invalid format of header name and header value pair '%s'. "
+ + "Expected: '[header name]:[header value]'", header));
+ }
+
+ // validate header name
+ String headerName = headerTokens[0].trim();
+ if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+ throw new ConfigException(String.format("Invalid header name '%s'. "
+ + "The '[header name]' cannot contain whitespace", headerName));
+ }
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
+ }
+ }
+
+ // Visible for testing
+ static void validateHeaderConfigAction(String action) {
+ if (HEADER_ACTIONS.stream().noneMatch(action::equalsIgnoreCase)) {
+ throw new ConfigException(String.format("Invalid header config action: '%s'. "
+ + "Expected one of %s", action, HEADER_ACTIONS));
+ }
+ }
+
+ private static class ListenersValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (!(value instanceof List)) {
+ throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443).");
+ }
+
+ List<?> items = (List<?>) value;
+ if (items.isEmpty()) {
+ throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443.");
+ }
+
+ for (Object item : items) {
+ if (!(item instanceof String)) {
+ throw new ConfigException("Invalid type for listeners (expected String).");
+ }
+ if (Utils.isBlank((String) item)) {
+ throw new ConfigException("Empty URL found when parsing listeners list.");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
+ }
+ }
+
+ private static class AdminListenersValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ return;
+ }
+
+ if (!(value instanceof List)) {
+ throw new ConfigException("Invalid value type for admin.listeners (expected list).");
+ }
+
+ List<?> items = (List<?>) value;
+ if (items.isEmpty()) {
+ return;
+ }
+
+ for (Object item : items) {
+ if (!(item instanceof String)) {
+ throw new ConfigException("Invalid type for admin.listeners (expected String).");
+ }
+ if (Utils.isBlank((String) item)) {
+ throw new ConfigException("Empty URL found when parsing admin.listeners list.");
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
+ }
+ }
+
+ private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ String strValue = (String) value;
+ if (Utils.isBlank(strValue)) {
+ return;
+ }
+
+ String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
+ Arrays.stream(configs).forEach(RestServerConfig::validateHttpResponseHeaderConfig);
+ }
+
+ @Override
+ public String toString() {
+ return "Comma-separated header rules, where each header rule is of the form "
+ + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+ + "if any part of a header rule contains a comma";
+ }
+ }
+
+ private static class InternalConfig extends RestServerConfig {
+
+ private static ConfigDef config() {
+ ConfigDef result = new ConfigDef().withClientSslSupport();
+ addInternalConfig(result);
+ return result;
+ }
+
+ @Override
+ public List<String> adminListeners() {
+ // Disable admin resources (such as the logging resource)
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> restExtensions() {
+ // Disable the use of REST extensions
+ return null;
+ }
+
+ @Override
+ public boolean topicTrackingEnabled() {
+ // Topic tracking is unnecessary if we don't expose a public REST API
+ return false;
+ }
+
+ @Override
+ public boolean topicTrackingResetEnabled() {
+ // Topic tracking is unnecessary if we don't expose a public REST API
+ return false;
+ }
+
+ public InternalConfig(Map<?, ?> props) {
+ super(config(), props);
+ }
+ }
+
+ private static class PublicConfig extends RestServerConfig {
+
+ private final Integer rebalanceTimeoutMs;
+ private static ConfigDef config() {
+ ConfigDef result = new ConfigDef().withClientSslSupport();
+ addPublicConfig(result);
+ WorkerConfig.addTopicTrackingConfig(result);
+ return result;
+ }
+
+ @Override
+ public List<String> adminListeners() {
+ return getList(ADMIN_LISTENERS_CONFIG);
+ }
+
+ @Override
+ public List<String> restExtensions() {
+ return getList(REST_EXTENSION_CLASSES_CONFIG);
+ }
+
+ @Override
+ public Integer rebalanceTimeoutMs() {
+ return rebalanceTimeoutMs;
+ }
+
+ @Override
+ public boolean topicTrackingEnabled() {
+ return getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG);
+ }
+
+ @Override
+ public boolean topicTrackingResetEnabled() {
+ return getBoolean(WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG);
+ }
+
+ public PublicConfig(Integer rebalanceTimeoutMs, Map<?, ?> props) {
+ super(config(), props);
+ this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+ }
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index f81e5daf6fd..5e5d97a3696 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -21,19 +21,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.core.HttpHeaders;
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.Crypto;
-import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
-import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
-import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -67,43 +63,33 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Translator;
+import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.IdentityTranslator;
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class ConnectorsResource implements ConnectResource {
private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
- private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
- new TypeReference<List<Map<String, String>>>() { };
private final Herder herder;
- private final RestClient restClient;
- private long requestTimeoutMs;
+ private final HerderRequestHandler requestHandler;
@javax.ws.rs.core.Context
private ServletContext context;
private final boolean isTopicTrackingDisabled;
private final boolean isTopicTrackingResetDisabled;
- public ConnectorsResource(Herder herder, WorkerConfig config, RestClient restClient) {
+ public ConnectorsResource(Herder herder, RestServerConfig config, RestClient restClient) {
this.herder = herder;
- this.restClient = restClient;
- isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
- isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
- this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
+ this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ this.isTopicTrackingDisabled = !config.topicTrackingEnabled();
+ this.isTopicTrackingResetDisabled = !config.topicTrackingResetEnabled();
}
@Override
public void requestTimeout(long requestTimeoutMs) {
- if (requestTimeoutMs < 1) {
- throw new IllegalArgumentException("REST request timeout must be positive");
- }
- this.requestTimeoutMs = requestTimeoutMs;
+ requestHandler.requestTimeoutMs(requestTimeoutMs);
}
@GET
@@ -160,7 +146,7 @@ public class ConnectorsResource implements ConnectResource {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
- Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
+ Herder.Created<ConnectorInfo> info = requestHandler.completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -175,7 +161,7 @@ public class ConnectorsResource implements ConnectResource {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
+ return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
}
@GET
@@ -186,7 +172,7 @@ public class ConnectorsResource implements ConnectResource {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
+ return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
}
@GET
@@ -198,7 +184,7 @@ public class ConnectorsResource implements ConnectResource {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
herder.tasksConfig(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
+ return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
}
@GET
@@ -247,7 +233,7 @@ public class ConnectorsResource implements ConnectResource {
checkAndPutConnectorConfigName(connector, connectorConfig);
herder.putConnectorConfig(connector, connectorConfig, true, cb);
- Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+ Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
Response.ResponseBuilder response;
if (createdInfo.created()) {
@@ -273,7 +259,7 @@ public class ConnectorsResource implements ConnectResource {
// For backward compatibility, just restart the connector instance and return OK with no body
FutureCallback<Void> cb = new FutureCallback<>();
herder.restartConnector(connector, cb);
- completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
+ requestHandler.completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
return Response.noContent().build();
}
@@ -283,7 +269,7 @@ public class ConnectorsResource implements ConnectResource {
Map<String, String> queryParameters = new HashMap<>();
queryParameters.put("includeTasks", includeTasks.toString());
queryParameters.put("onlyFailed", onlyFailed.toString());
- ConnectorStateInfo stateInfo = completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>() {
+ ConnectorStateInfo stateInfo = requestHandler.completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>() {
}, new IdentityTranslator<>(), forward);
return Response.accepted().entity(stateInfo).build();
}
@@ -314,33 +300,7 @@ public class ConnectorsResource implements ConnectResource {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
herder.taskConfigs(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
- }, forward);
- }
-
- @POST
- @Path("/{connector}/tasks")
- @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
- public void putTaskConfigs(final @PathParam("connector") String connector,
- final @Context HttpHeaders headers,
- final @QueryParam("forward") Boolean forward,
- final byte[] requestBody) throws Throwable {
- List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
- FutureCallback<Void> cb = new FutureCallback<>();
- herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
- completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
- }
-
- @PUT
- @Path("/{connector}/fence")
- @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
- public void fenceZombies(final @PathParam("connector") String connector,
- final @Context HttpHeaders headers,
- final @QueryParam("forward") Boolean forward,
- final byte[] requestBody) throws Throwable {
- FutureCallback<Void> cb = new FutureCallback<>();
- herder.fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
- completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward);
+ return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() { }, forward);
}
@GET
@@ -362,7 +322,7 @@ public class ConnectorsResource implements ConnectResource {
FutureCallback<Void> cb = new FutureCallback<>();
ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
herder.restartTask(taskId, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
+ requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
}
@DELETE
@@ -373,7 +333,7 @@ public class ConnectorsResource implements ConnectResource {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.deleteConnectorConfig(connector, cb);
- completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
+ requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
}
// Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
@@ -388,95 +348,6 @@ public class ConnectorsResource implements ConnectResource {
}
}
- // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
- // request to the leader.
- private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
- String path,
- String method,
- HttpHeaders headers,
- Map<String, String> queryParameters,
- Object body,
- TypeReference<U> resultType,
- Translator<T, U> translator,
- Boolean forward) throws Throwable {
- try {
- return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
-
- if (cause instanceof RequestTargetException) {
- if (restClient == null) {
- throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
- "Cannot complete request as non-leader with request forwarding disabled");
- } else if (forward == null || forward) {
- // the only time we allow recursive forwarding is when no forward flag has
- // been set, which should only be seen by the first worker to handle a user request.
- // this gives two total hops to resolve the request before giving up.
- boolean recursiveForward = forward == null;
- RequestTargetException targetException = (RequestTargetException) cause;
- String forwardedUrl = targetException.forwardUrl();
- if (forwardedUrl == null) {
- // the target didn't know of the leader at this moment.
- throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
- "Cannot complete request momentarily due to no known leader URL, "
- + "likely because a rebalance was underway.");
- }
- UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl)
- .path(path)
- .queryParam("forward", recursiveForward);
- if (queryParameters != null) {
- queryParameters.forEach(uriBuilder::queryParam);
- }
- String forwardUrl = uriBuilder.build().toString();
- log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
- return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType));
- } else {
- // we should find the right target for the query within two hops, so if
- // we don't, it probably means that a rebalance has taken place.
- throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
- "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
- }
- } else if (cause instanceof RebalanceNeededException) {
- throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
- "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
- }
-
- throw cause;
- } catch (TimeoutException e) {
- // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
- // error is the best option
- throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
- } catch (InterruptedException e) {
- throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
- }
- }
-
- private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
- TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
- return completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
- }
-
- private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
- TypeReference<T> resultType, Boolean forward) throws Throwable {
- return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<>(), forward);
- }
-
- private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
- Object body, Boolean forward) throws Throwable {
- return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<>(), forward);
- }
-
- private interface Translator<T, U> {
- T translate(RestClient.HttpResponse<U> response);
- }
-
- private static class IdentityTranslator<T> implements Translator<T, T> {
- @Override
- public T translate(RestClient.HttpResponse<T> response) {
- return response.body();
- }
- }
-
private static class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
@Override
public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> response) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
new file mode 100644
index 00000000000..c7bef991b41
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.v3.oas.annotations.Operation;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.Crypto;
+import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
+import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Contains endpoints necessary for intra-cluster communication--that is, requests that
+ * workers will issue to each other that originate from within the cluster, as opposed to
+ * requests that originate from a user and are forwarded from one worker to another.
+ */
+@Produces(MediaType.APPLICATION_JSON)
+public abstract class InternalClusterResource implements ConnectResource {
+
+ private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
+ new TypeReference<List<Map<String, String>>>() { };
+
+ private final HerderRequestHandler requestHandler;
+
+ // Visible for testing
+ @Context
+ UriInfo uriInfo;
+
+ protected InternalClusterResource(RestClient restClient) {
+ this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ }
+
+ @Override
+ public void requestTimeout(long requestTimeoutMs) {
+ requestHandler.requestTimeoutMs(requestTimeoutMs);
+ }
+
+ /**
+ * @return a {@link Herder} instance that can be used to satisfy the current request; may not be null
+ * @throws javax.ws.rs.NotFoundException if no such herder can be provided
+ */
+ protected abstract Herder herderForRequest();
+
+ @POST
+ @Path("/{connector}/tasks")
+ @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
+ public void putTaskConfigs(
+ final @PathParam("connector") String connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean forward,
+ final byte[] requestBody) throws Throwable {
+ List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
+ FutureCallback<Void> cb = new FutureCallback<>();
+ herderForRequest().putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
+ requestHandler.completeOrForwardRequest(
+ cb,
+ uriInfo.getPath(),
+ "POST",
+ headers,
+ taskConfigs,
+ forward
+ );
+ }
+
+ @PUT
+ @Path("/{connector}/fence")
+ @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
+ public void fenceZombies(
+ final @PathParam("connector") String connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean forward,
+ final byte[] requestBody) throws Throwable {
+ FutureCallback<Void> cb = new FutureCallback<>();
+ herderForRequest().fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
+ requestHandler.completeOrForwardRequest(
+ cb,
+ uriInfo.getPath(),
+ "PUT",
+ headers,
+ requestBody,
+ forward
+ );
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
new file mode 100644
index 00000000000..c4987150dfb
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+
+import javax.ws.rs.Path;
+
+@Path("/connectors")
+public class InternalConnectResource extends InternalClusterResource {
+
+ private final Herder herder;
+
+ public InternalConnectResource(Herder herder, RestClient restClient) {
+ super(restClient);
+ this.herder = herder;
+ }
+
+ @Override
+ protected Herder herderForRequest() {
+ return herder;
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
index c913ffe747b..efb184849f8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.connect.runtime.rest.util;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -40,7 +40,7 @@ public class SSLUtils {
/**
* Configures SSL/TLS for HTTPS Jetty Server using configs with the given prefix
*/
- public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config, String prefix) {
+ public static SslContextFactory createServerSideSslContextFactory(AbstractConfig config, String prefix) {
Map<String, Object> sslConfigValues = config.valuesWithPrefixAllOrNothing(prefix);
final SslContextFactory.Server ssl = new SslContextFactory.Server();
@@ -56,14 +56,14 @@ public class SSLUtils {
/**
* Configures SSL/TLS for HTTPS Jetty Server
*/
- public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config) {
+ public static SslContextFactory createServerSideSslContextFactory(AbstractConfig config) {
return createServerSideSslContextFactory(config, "listeners.https.");
}
/**
* Configures SSL/TLS for HTTPS Jetty Client
*/
- public static SslContextFactory createClientSideSslContextFactory(WorkerConfig config) {
+ public static SslContextFactory createClientSideSslContextFactory(AbstractConfig config) {
Map<String, Object> sslConfigValues = config.valuesWithPrefixAllOrNothing("listeners.https.");
final SslContextFactory.Client ssl = new SslContextFactory.Client();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index 6ec86bd9342..a0f993f2f63 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -44,7 +44,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.REST_EXTENSION_CLASSES_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
index 8046f411010..7c9e2f2c51e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
@@ -34,8 +34,9 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@@ -80,10 +81,10 @@ public class RestForwardingIntegrationTest {
private Map<String, Object> sslConfig;
@Mock
private Plugins plugins;
- private RestServer followerServer;
+ private ConnectRestServer followerServer;
@Mock
private Herder followerHerder;
- private RestServer leaderServer;
+ private ConnectRestServer leaderServer;
@Mock
private Herder leaderHerder;
@@ -158,14 +159,14 @@ public class RestForwardingIntegrationTest {
// Follower worker setup
RestClient followerClient = new RestClient(followerConfig);
- followerServer = new RestServer(followerConfig, followerClient);
+ followerServer = new ConnectRestServer(null, followerClient, followerConfig.originals());
followerServer.initializeServer();
when(followerHerder.plugins()).thenReturn(plugins);
followerServer.initializeResources(followerHerder);
// Leader worker setup
RestClient leaderClient = new RestClient(leaderConfig);
- leaderServer = new RestServer(leaderConfig, leaderClient);
+ leaderServer = new ConnectRestServer(null, leaderClient, leaderConfig.originals());
leaderServer.initializeServer();
when(leaderHerder.plugins()).thenReturn(plugins);
leaderServer.initializeResources(leaderHerder);
@@ -235,13 +236,13 @@ public class RestForwardingIntegrationTest {
}
}
if (dualListener) {
- workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
+ workerProps.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
// This server is brought up with both a plaintext and an SSL listener; we use this property
// to dictate which URL it advertises to other servers when a request must be forwarded to it
// and which URL we issue requests against during testing
- workerProps.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
+ workerProps.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
} else {
- workerProps.put(WorkerConfig.LISTENERS_CONFIG, advertiseSSL ? "https://localhost:0" : "http://localhost:0");
+ workerProps.put(RestServerConfig.LISTENERS_CONFIG, advertiseSSL ? "https://localhost:0" : "http://localhost:0");
}
return workerProps;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index ec9a843d4da..dbb25cdafac 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.After;
import org.junit.Before;
@@ -33,40 +31,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.List;
-import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
public class WorkerConfigTest {
- private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
- "add \t Cache-Control: no-cache, no-store, must-revalidate",
- "add \r X-XSS-Protection: 1; mode=block",
- "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
- "AdD Strict-Transport-Security: \r max-age=31536000; includeSubDomains",
- "AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains",
- "add X-Content-Type-Options: \r nosniff",
- "Set \t X-Frame-Options: \t Deny\n ",
- "seT \t X-Cache-Info: \t not cacheable\n ",
- "seTDate \t Expires: \r 31540000000",
- "adDdate \n Last-Modified: \t 0"
- );
-
- private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
- "set \t",
- "badaction \t X-Frame-Options:DENY",
- "set add X-XSS-Protection:1",
- "addX-XSS-Protection",
- "X-XSS-Protection:",
- "add set X-XSS-Protection: 1",
- "add X-XSS-Protection:1 X-XSS-Protection:1 ",
- "add X-XSS-Protection",
- "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate "
- );
private static final String CLUSTER_ID = "cluster-id";
private MockedStatic<WorkerConfig> workerConfigMockedStatic;
@@ -82,105 +54,6 @@ public class WorkerConfigTest {
workerConfigMockedStatic.close();
}
- @Test
- public void testListenersConfigAllowedValues() {
- Map<String, String> props = baseProps();
-
- // no value set for "listeners"
- WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999");
- config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertEquals(Arrays.asList("http://a.b:9999"), config.getList(WorkerConfig.LISTENERS_CONFIG));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
- config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.LISTENERS_CONFIG));
-
- new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- }
-
- @Test
- public void testListenersConfigNotAllowedValues() {
- Map<String, String> props = baseProps();
- assertEquals(LISTENERS_DEFAULT, new WorkerConfig(WorkerConfig.baseConfigDef(), props).getList(WorkerConfig.LISTENERS_CONFIG));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, "");
- ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(" listeners"));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, ",,,");
- ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(" listeners"));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999,");
- ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(" listeners"));
-
- props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
- ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(" listeners"));
- }
-
- @Test
- public void testAdminListenersConfigAllowedValues() {
- Map<String, String> props = baseProps();
-
- // no value set for "admin.listeners"
- WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertNull("Default value should be null.", config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG));
-
- props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "");
- config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertTrue(config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG).isEmpty());
-
- props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
- config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG));
-
- new WorkerConfig(WorkerConfig.baseConfigDef(), props);
- }
-
- @Test
- public void testAdminListenersNotAllowingEmptyStrings() {
- Map<String, String> props = baseProps();
-
- props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,");
- ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(" admin.listeners"));
- }
-
- @Test
- public void testAdminListenersNotAllowingBlankStrings() {
- Map<String, String> props = baseProps();
- props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
- assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- }
-
- @Test
- public void testInvalidHeaderConfigs() {
- for (String config : INVALID_HEADER_CONFIGS) {
- assertInvalidHeaderConfig(config);
- }
- }
-
- @Test
- public void testValidHeaderConfigs() {
- for (String config : VALID_HEADER_CONFIGS) {
- assertValidHeaderConfig(config);
- }
- }
-
- @Test
- public void testInvalidSslClientAuthConfig() {
- Map<String, String> props = baseProps();
-
- props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "abc");
- ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
- assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
- }
-
@Test
public void testLookupKafkaClusterId() {
final Node broker1 = new Node(0, "dummyHost-1", 1234);
@@ -225,14 +98,6 @@ public class WorkerConfigTest {
workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
}
- private void assertInvalidHeaderConfig(String config) {
- assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
- }
-
- private void assertValidHeaderConfig(String config) {
- WorkerConfig.validateHttpResponseHeaderConfig(config);
- }
-
private Map<String, String> baseProps() {
Map<String, String> props = new HashMap<>();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e387847e6be..99dffe17d9d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -244,7 +244,7 @@ public class DistributedHerderTest {
new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", "recordRestarting"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
- new AutoCloseable[]{uponShutdown});
+ Collections.emptyList(), new AutoCloseable[]{uponShutdown});
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener(time);
@@ -4020,7 +4020,7 @@ public class DistributedHerderTest {
new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
new DistributedConfig(config), worker, WORKER_ID, KAFKA_CLUSTER_ID,
statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
- new AutoCloseable[0]);
+ Collections.emptyList(), new AutoCloseable[0]);
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b040de92c4..ee628e113e6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
@@ -140,12 +141,13 @@ public class PluginsTest {
@Test
public void shouldInstantiateAndConfigureConnectRestExtension() {
- props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
+ props.clear();
+ props.put(RestServerConfig.REST_EXTENSION_CLASSES_CONFIG,
TestConnectRestExtension.class.getName());
- createConfig();
+ config = RestServerConfig.forPublic(null, props);
List<ConnectRestExtension> connectRestExtensions =
- plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+ plugins.newPlugins(config.getList(RestServerConfig.REST_EXTENSION_CLASSES_CONFIG),
config,
ConnectRestExtension.class);
assertNotNull(connectRestExtensions);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
similarity index 70%
rename from connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
rename to connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index e0c5b22c90e..ad5c88f9aeb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -29,14 +29,11 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -55,20 +52,21 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-public class RestServerTest {
+public class ConnectRestServerTest {
private Herder herder;
private Plugins plugins;
- private RestServer server;
+ private ConnectRestServer server;
private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses = new ArrayList<>();
@@ -94,18 +92,10 @@ public class RestServerTest {
}
}
- private Map<String, String> baseWorkerProps() {
- Map<String, String> workerProps = new HashMap<>();
- workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
- workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
- workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
- workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
- workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");
-
- return workerProps;
+ private Map<String, String> baseServerProps() {
+ Map<String, String> configMap = new HashMap<>();
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");
+ return configMap;
}
@Test
@@ -121,65 +111,60 @@ public class RestServerTest {
@Test
public void testAdvertisedUri() {
// Advertised URI from listeners without protocol
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
- DistributedConfig config = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
- server = new RestServer(config, null);
+ server = new ConnectRestServer(null, null, configMap);
Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString());
server.stop();
// Advertised URI from listeners with protocol
- configMap = new HashMap<>(baseWorkerProps());
- configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
- configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
- config = new DistributedConfig(configMap);
+ configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
+ configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
- server = new RestServer(config, null);
+ server = new ConnectRestServer(null, null, configMap);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop();
// Advertised URI from listeners with only SSL available
- configMap = new HashMap<>(baseWorkerProps());
- configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
- config = new DistributedConfig(configMap);
+ configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
- server = new RestServer(config, null);
+ server = new ConnectRestServer(null, null, configMap);
Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
server.stop();
// Listener is overriden by advertised values
- configMap = new HashMap<>(baseWorkerProps());
- configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
- configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
- configMap.put(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
- configMap.put(WorkerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
- config = new DistributedConfig(configMap);
-
- server = new RestServer(config, null);
+ configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+ configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+ configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
+ configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
+
+ server = new ConnectRestServer(null, null, configMap);
Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
server.stop();
// correct listener is chosen when https listener is configured before http listener and advertised listener is http
- configMap = new HashMap<>(baseWorkerProps());
- configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
- configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
- config = new DistributedConfig(configMap);
- server = new RestServer(config, null);
+ configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
+ configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+
+ server = new ConnectRestServer(null, null, configMap);
Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString());
server.stop();
}
@Test
public void testOptionsDoesNotIncludeWadlOutput() throws IOException {
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- DistributedConfig workerConfig = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -197,17 +182,16 @@ public class RestServerTest {
public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method)
throws IOException {
- Map<String, String> workerProps = baseWorkerProps();
- workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
- workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
- WorkerConfig workerConfig = new DistributedConfig(workerProps);
+ Map<String, String> configMap = baseServerProps();
+ configMap.put(RestServerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+ configMap.put(RestServerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
URI serverUrl = server.advertisedUrl();
@@ -242,16 +226,15 @@ public class RestServerTest {
@Test
public void testStandaloneConfig() throws IOException {
- Map<String, String> workerProps = baseWorkerProps();
- workerProps.put("offset.storage.file.filename", "/tmp");
- WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+ Map<String, String> configMap = baseServerProps();
+ configMap.put("offset.storage.file.filename", "/tmp");
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
@@ -262,17 +245,16 @@ public class RestServerTest {
@Test
public void testLoggersEndpointWithDefaults() throws IOException {
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- DistributedConfig workerConfig = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
// create some loggers in the process
LoggerFactory.getLogger("a.b.c.s.W");
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -292,14 +274,12 @@ public class RestServerTest {
@Test
public void testIndependentAdminEndpoint() throws IOException {
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- configMap.put(ADMIN_LISTENERS_CONFIG, "http://localhost:0");
-
- DistributedConfig workerConfig = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://localhost:0");
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
// create some loggers in the process
LoggerFactory.getLogger("a.b.c.s.W");
@@ -307,7 +287,7 @@ public class RestServerTest {
LoggerFactory.getLogger("a.b.c.p.Y");
LoggerFactory.getLogger("a.b.c.p.Z");
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -322,16 +302,14 @@ public class RestServerTest {
@Test
public void testDisableAdminEndpoint() throws IOException {
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- configMap.put(ADMIN_LISTENERS_CONFIG, "");
-
- DistributedConfig workerConfig = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
+ configMap.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "");
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -344,14 +322,13 @@ public class RestServerTest {
@Test
public void testRequestLogs() throws IOException, InterruptedException {
- Map<String, String> configMap = new HashMap<>(baseWorkerProps());
- DistributedConfig workerConfig = new DistributedConfig(configMap);
+ Map<String, String> configMap = new HashMap<>(baseServerProps());
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -388,17 +365,16 @@ public class RestServerTest {
private void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
throws IOException {
- Map<String, String> workerProps = baseWorkerProps();
- workerProps.put("offset.storage.file.filename", "/tmp");
- workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
- WorkerConfig workerConfig = new DistributedConfig(workerProps);
+ Map<String, String> configMap = baseServerProps();
+ configMap.put("offset.storage.file.filename", "/tmp");
+ configMap.put(RestServerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
doReturn(plugins).when(herder).plugins();
- doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+ expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new RestServer(workerConfig, null);
+ server = new ConnectRestServer(null, null, configMap);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
@@ -442,4 +418,12 @@ public class RestServerTest {
ObjectMapper mapper = new ObjectMapper();
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map);
}
+
+ private void expectEmptyRestExtensions() {
+ doReturn(Collections.emptyList()).when(plugins).newPlugins(
+ eq(Collections.emptyList()),
+ any(AbstractConfig.class),
+ eq(ConnectRestExtension.class)
+ );
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
new file mode 100644
index 00000000000..28dd725afd4
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class RestServerConfigTest {
+
+ private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+ "add \t Cache-Control: no-cache, no-store, must-revalidate",
+ "add \r X-XSS-Protection: 1; mode=block",
+ "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
+ "AdD Strict-Transport-Security: \r max-age=31536000; includeSubDomains",
+ "AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains",
+ "add X-Content-Type-Options: \r nosniff",
+ "Set \t X-Frame-Options: \t Deny\n ",
+ "seT \t X-Cache-Info: \t not cacheable\n ",
+ "seTDate \t Expires: \r 31540000000",
+ "adDdate \n Last-Modified: \t 0"
+ );
+
+ private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
+ "set \t",
+ "badaction \t X-Frame-Options:DENY",
+ "set add X-XSS-Protection:1",
+ "addX-XSS-Protection",
+ "X-XSS-Protection:",
+ "add set X-XSS-Protection: 1",
+ "add X-XSS-Protection:1 X-XSS-Protection:1 ",
+ "add X-XSS-Protection",
+ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate "
+ );
+
+ @Test
+ public void testListenersConfigAllowedValues() {
+ Map<String, String> props = new HashMap<>();
+
+ // no value set for "listeners"
+ RestServerConfig config = RestServerConfig.forPublic(null, props);
+ assertEquals(LISTENERS_DEFAULT, config.listeners());
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999");
+ config = RestServerConfig.forPublic(null, props);
+ assertEquals(Arrays.asList("http://a.b:9999"), config.listeners());
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
+ config = RestServerConfig.forPublic(null, props);
+ assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.listeners());
+
+ config = RestServerConfig.forPublic(null, props);
+ }
+
+ @Test
+ public void testListenersConfigNotAllowedValues() {
+ Map<String, String> props = new HashMap<>();
+ assertEquals(LISTENERS_DEFAULT, RestServerConfig.forPublic(null, props).listeners());
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, "");
+ ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(" listeners"));
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, ",,,");
+ ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(" listeners"));
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999,");
+ ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(" listeners"));
+
+ props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
+ ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(" listeners"));
+ }
+
+ @Test
+ public void testAdminListenersConfigAllowedValues() {
+ Map<String, String> props = new HashMap<>();
+
+ // no value set for "admin.listeners"
+ RestServerConfig config = RestServerConfig.forPublic(null, props);
+ assertNull("Default value should be null.", config.adminListeners());
+
+ props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "");
+ config = RestServerConfig.forPublic(null, props);
+ assertTrue(config.adminListeners().isEmpty());
+
+ props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
+ config = RestServerConfig.forPublic(null, props);
+ assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.adminListeners());
+
+ RestServerConfig.forPublic(null, props);
+ }
+
+ @Test
+ public void testAdminListenersNotAllowingEmptyStrings() {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,");
+ ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(" admin.listeners"));
+ }
+
+ @Test
+ public void testAdminListenersNotAllowingBlankStrings() {
+ Map<String, String> props = new HashMap<>();
+ props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
+ assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ }
+
+ @Test
+ public void testInvalidHeaderConfigs() {
+ for (String config : INVALID_HEADER_CONFIGS) {
+ assertInvalidHeaderConfig(config);
+ }
+ }
+
+ @Test
+ public void testValidHeaderConfigs() {
+ for (String config : VALID_HEADER_CONFIGS) {
+ assertValidHeaderConfig(config);
+ }
+ }
+
+ private void assertInvalidHeaderConfig(String config) {
+ assertThrows(ConfigException.class, () -> RestServerConfig.validateHttpResponseHeaderConfig(config));
+ }
+
+ private void assertValidHeaderConfig(String config) {
+ RestServerConfig.validateHttpResponseHeaderConfig(config);
+ }
+
+ @Test
+ public void testInvalidSslClientAuthConfig() {
+ Map<String, String> props = new HashMap<>();
+
+ props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+ assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 781a09d72ae..26327770e0c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;
-import javax.crypto.Mac;
import javax.ws.rs.core.HttpHeaders;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -26,12 +25,11 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
-import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -61,7 +59,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -70,8 +67,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -153,15 +148,15 @@ public class ConnectorsResourceTest {
private ConnectorsResource connectorsResource;
private UriInfo forward;
@Mock
- private WorkerConfig workerConfig;
- @Mock
private RestClient restClient;
+ @Mock
+ private RestServerConfig serverConfig;
@Before
public void setUp() throws NoSuchMethodException {
- when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
- when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+ when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
forward = mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
queryParams.putSingle("forward", "true");
@@ -565,66 +560,6 @@ public class ConnectorsResourceTest {
assertThrows(NotFoundException.class, () -> connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD));
}
- @Test
- public void testPutConnectorTaskConfigsNoInternalRequestSignature() throws Throwable {
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
- expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
- eq(CONNECTOR_NAME),
- eq(TASK_CONFIGS),
- cb.capture(),
- any()
- );
-
- connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(TASK_CONFIGS));
- }
-
- @Test
- public void testPutConnectorTaskConfigsWithInternalRequestSignature() throws Throwable {
- final String signatureAlgorithm = "HmacSHA256";
- final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
-
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
- final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
- expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
- eq(CONNECTOR_NAME),
- eq(TASK_CONFIGS),
- cb.capture(),
- signatureCapture.capture()
- );
-
- HttpHeaders headers = mock(HttpHeaders.class);
- when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
- .thenReturn(signatureAlgorithm);
- when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
- .thenReturn(encodedSignature);
-
- connectorsResource.putTaskConfigs(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(TASK_CONFIGS));
-
- InternalRequestSignature expectedSignature = new InternalRequestSignature(
- serializeAsBytes(TASK_CONFIGS),
- Mac.getInstance(signatureAlgorithm),
- Base64.getDecoder().decode(encodedSignature)
- );
- assertEquals(
- expectedSignature,
- signatureCapture.getValue()
- );
- }
-
- @Test
- public void testPutConnectorTaskConfigsConnectorNotFound() {
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
- expectAndCallbackException(cb, new NotFoundException("not found")).when(herder).putTaskConfigs(
- eq(CONNECTOR_NAME),
- eq(TASK_CONFIGS),
- cb.capture(),
- any()
- );
-
- assertThrows(NotFoundException.class, () -> connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
- FORWARD, serializeAsBytes(TASK_CONFIGS)));
- }
-
@Test
public void testRestartConnectorAndTasksConnectorNotFound() {
RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, true, false);
@@ -683,55 +618,6 @@ public class ConnectorsResourceTest {
assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
}
- @Test
- public void testFenceZombiesNoInternalRequestSignature() throws Throwable {
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
- expectAndCallbackResult(cb, null)
- .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), isNull());
-
- connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null));
- }
-
- @Test
- public void testFenceZombiesWithInternalRequestSignature() throws Throwable {
- final String signatureAlgorithm = "HmacSHA256";
- final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
-
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
- final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
- expectAndCallbackResult(cb, null)
- .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), signatureCapture.capture());
-
- HttpHeaders headers = mock(HttpHeaders.class);
- when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
- .thenReturn(signatureAlgorithm);
- when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
- .thenReturn(encodedSignature);
-
- connectorsResource.fenceZombies(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(null));
-
- InternalRequestSignature expectedSignature = new InternalRequestSignature(
- serializeAsBytes(null),
- Mac.getInstance(signatureAlgorithm),
- Base64.getDecoder().decode(encodedSignature)
- );
- assertEquals(
- expectedSignature,
- signatureCapture.getValue()
- );
- }
-
- @Test
- public void testFenceZombiesConnectorNotFound() throws Throwable {
- final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-
- expectAndCallbackException(cb, new NotFoundException("not found"))
- .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any());
-
- assertThrows(NotFoundException.class,
- () -> connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null)));
- }
-
@Test
public void testRestartConnectorNotFound() {
final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
@@ -806,9 +692,9 @@ public class ConnectorsResourceTest {
@Test
public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
- when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
- when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ when(serverConfig.topicTrackingEnabled()).thenReturn(false);
+ when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@@ -817,10 +703,10 @@ public class ConnectorsResourceTest {
@Test
public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() {
- when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
- when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
+ when(serverConfig.topicTrackingEnabled()).thenReturn(false);
+ when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -829,10 +715,10 @@ public class ConnectorsResourceTest {
@Test
public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() {
- when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
- when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
+ when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+ when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -841,11 +727,11 @@ public class ConnectorsResourceTest {
@Test
public void testConnectorActiveTopics() {
- when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
- when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
+ when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+ when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
when(herder.connectorActiveTopics(CONNECTOR_NAME))
.thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@@ -858,7 +744,7 @@ public class ConnectorsResourceTest {
@Test
public void testResetConnectorActiveTopics() {
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
new file mode 100644
index 00000000000..5bf33bf5300
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.util.Callback;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Stubber;
+
+import javax.crypto.Mac;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class InternalConnectResourceTest {
+
+ private static final Boolean FORWARD = true;
+ private static final String CONNECTOR_NAME = "test";
+ private static final HttpHeaders NULL_HEADERS = null;
+ private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
+ static {
+ TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
+ TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
+ }
+ private static final String FENCE_PATH = "/connectors/" + CONNECTOR_NAME + "/fence";
+ private static final String TASK_CONFIGS_PATH = "/connectors/" + CONNECTOR_NAME + "/tasks";
+
+ @Mock
+ private UriInfo uriInfo;
+ @Mock
+ private Herder herder;
+ @Mock
+ private RestClient restClient;
+
+ private InternalConnectResource internalResource;
+
+ @Before
+ public void setup() {
+ internalResource = new InternalConnectResource(herder, restClient);
+ internalResource.uriInfo = uriInfo;
+ }
+
+ @Test
+ public void testPutConnectorTaskConfigsNoInternalRequestSignature() throws Throwable {
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
+ eq(CONNECTOR_NAME),
+ eq(TASK_CONFIGS),
+ cb.capture(),
+ any()
+ );
+ expectRequestPath(TASK_CONFIGS_PATH);
+
+ internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(TASK_CONFIGS));
+ }
+
+ @Test
+ public void testPutConnectorTaskConfigsWithInternalRequestSignature() throws Throwable {
+ final String signatureAlgorithm = "HmacSHA256";
+ final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
+
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+ final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
+ expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
+ eq(CONNECTOR_NAME),
+ eq(TASK_CONFIGS),
+ cb.capture(),
+ signatureCapture.capture()
+ );
+
+ HttpHeaders headers = mock(HttpHeaders.class);
+ when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
+ .thenReturn(signatureAlgorithm);
+ when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
+ .thenReturn(encodedSignature);
+ expectRequestPath(TASK_CONFIGS_PATH);
+
+ internalResource.putTaskConfigs(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(TASK_CONFIGS));
+
+ InternalRequestSignature expectedSignature = new InternalRequestSignature(
+ serializeAsBytes(TASK_CONFIGS),
+ Mac.getInstance(signatureAlgorithm),
+ Base64.getDecoder().decode(encodedSignature)
+ );
+ assertEquals(
+ expectedSignature,
+ signatureCapture.getValue()
+ );
+ }
+
+ @Test
+ public void testPutConnectorTaskConfigsConnectorNotFound() {
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackException(cb, new NotFoundException("not found")).when(herder).putTaskConfigs(
+ eq(CONNECTOR_NAME),
+ eq(TASK_CONFIGS),
+ cb.capture(),
+ any()
+ );
+ expectRequestPath(TASK_CONFIGS_PATH);
+
+ assertThrows(NotFoundException.class, () -> internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
+ FORWARD, serializeAsBytes(TASK_CONFIGS)));
+ }
+
+ @Test
+ public void testFenceZombiesNoInternalRequestSignature() throws Throwable {
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+ expectAndCallbackResult(cb, null)
+ .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), isNull());
+ expectRequestPath(FENCE_PATH);
+
+ internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null));
+ }
+
+ @Test
+ public void testFenceZombiesWithInternalRequestSignature() throws Throwable {
+ final String signatureAlgorithm = "HmacSHA256";
+ final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
+
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+ final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
+ expectAndCallbackResult(cb, null)
+ .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), signatureCapture.capture());
+
+ HttpHeaders headers = mock(HttpHeaders.class);
+ when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
+ .thenReturn(signatureAlgorithm);
+ when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
+ .thenReturn(encodedSignature);
+ expectRequestPath(FENCE_PATH);
+
+ internalResource.fenceZombies(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(null));
+
+ InternalRequestSignature expectedSignature = new InternalRequestSignature(
+ serializeAsBytes(null),
+ Mac.getInstance(signatureAlgorithm),
+ Base64.getDecoder().decode(encodedSignature)
+ );
+ assertEquals(
+ expectedSignature,
+ signatureCapture.getValue()
+ );
+ }
+
+ @Test
+ public void testFenceZombiesConnectorNotFound() throws Throwable {
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+
+ expectAndCallbackException(cb, new NotFoundException("not found"))
+ .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any());
+ expectRequestPath(FENCE_PATH);
+
+ assertThrows(NotFoundException.class,
+ () -> internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null)));
+ }
+
+ private <T> byte[] serializeAsBytes(final T value) throws IOException {
+ return new ObjectMapper().writeValueAsBytes(value);
+ }
+
+ private <T> Stubber expectAndCallbackResult(final ArgumentCaptor<Callback<T>> cb, final T value) {
+ return doAnswer(invocation -> {
+ cb.getValue().onCompletion(null, value);
+ return null;
+ });
+ }
+
+ private <T> Stubber expectAndCallbackException(final ArgumentCaptor<Callback<T>> cb, final Throwable t) {
+ return doAnswer(invocation -> {
+ cb.getValue().onCompletion(t, null);
+ return null;
+ });
+ }
+
+ private void expectRequestPath(String path) {
+ when(uriInfo.getPath()).thenReturn(path);
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
index b8ffbcffbed..8d486bbb9a3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
@@ -16,11 +16,8 @@
*/
package org.apache.kafka.connect.runtime.rest.util;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@@ -31,17 +28,6 @@ import java.util.Map;
@SuppressWarnings("deprecation")
public class SSLUtilsTest {
- private static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
- static {
- // The WorkerConfig base class has some required settings without defaults
- DEFAULT_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
- DEFAULT_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
- DEFAULT_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
- DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
- }
@Test
public void testGetOrDefault() {
@@ -58,7 +44,7 @@ public class SSLUtilsTest {
@Test
public void testCreateServerSideSslContextFactory() {
- Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+ Map<String, String> configMap = new HashMap<>();
configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456");
@@ -76,7 +62,7 @@ public class SSLUtilsTest {
configMap.put("ssl.keymanager.algorithm", "SunX509");
configMap.put("ssl.trustmanager.algorithm", "PKIX");
- DistributedConfig config = new DistributedConfig(configMap);
+ RestServerConfig config = RestServerConfig.forPublic(null, configMap);
SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config);
Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath());
@@ -96,7 +82,7 @@ public class SSLUtilsTest {
@Test
public void testCreateClientSideSslContextFactory() {
- Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+ Map<String, String> configMap = new HashMap<>();
configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456");
@@ -114,7 +100,7 @@ public class SSLUtilsTest {
configMap.put("ssl.keymanager.algorithm", "SunX509");
configMap.put("ssl.trustmanager.algorithm", "PKIX");
- DistributedConfig config = new DistributedConfig(configMap);
+ RestServerConfig config = RestServerConfig.forPublic(null, configMap);
SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config);
Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath());
@@ -134,10 +120,7 @@ public class SSLUtilsTest {
@Test
public void testCreateServerSideSslContextFactoryDefaultValues() {
- Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
- configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
- configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ Map<String, String> configMap = new HashMap<>();
configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456");
@@ -147,7 +130,7 @@ public class SSLUtilsTest {
configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
- DistributedConfig config = new DistributedConfig(configMap);
+ RestServerConfig config = RestServerConfig.forPublic(null, configMap);
SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config);
Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType());
@@ -162,10 +145,7 @@ public class SSLUtilsTest {
@Test
public void testCreateClientSideSslContextFactoryDefaultValues() {
- Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
- configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
- configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
- configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+ Map<String, String> configMap = new HashMap<>();
configMap.put("ssl.keystore.location", "/path/to/keystore");
configMap.put("ssl.keystore.password", "123456");
configMap.put("ssl.key.password", "123456");
@@ -175,7 +155,7 @@ public class SSLUtilsTest {
configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
- DistributedConfig config = new DistributedConfig(configMap);
+ RestServerConfig config = RestServerConfig.forPublic(null, configMap);
SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config);
Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 9b91b55e92f..9687fb8c1f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -54,7 +54,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;