You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/09 15:50:15 UTC

[kafka] branch trunk updated: KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f93d5af8392 KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)
f93d5af8392 is described below

commit f93d5af8392e01f29b853cc44c6f78d74ead41be
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Feb 9 10:50:07 2023 -0500

    KAFKA-15086, KAFKA-9981: Intra-cluster communication for Mirror Maker 2 (#13137)
    
    Reviewers: Daniel Urban <du...@cloudera.com>, Greg Harris <gr...@aiven.io>, Viktor Somogyi-Vass <vi...@gmail.com>, Mickael Maison <mi...@gmail.com>
---
 build.gradle                                       |  11 +
 checkstyle/import-control.xml                      |   3 +
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/connect/mirror/MirrorMaker.java   |  65 ++-
 .../kafka/connect/mirror/MirrorMakerConfig.java    |  41 +-
 .../connect/mirror/rest/MirrorRestServer.java      |  59 +++
 .../rest/resources/InternalMirrorResource.java     |  65 +++
 .../DedicatedMirrorIntegrationTest.java            | 290 +++++++++++++
 .../kafka/connect/cli/AbstractConnectCli.java      |   3 +-
 .../kafka/connect/cli/ConnectDistributed.java      |   4 +-
 .../org/apache/kafka/connect/runtime/Connect.java  |   5 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java | 229 +---------
 .../runtime/distributed/DistributedConfig.java     |   2 +-
 .../runtime/distributed/DistributedHerder.java     |  50 ++-
 .../connect/runtime/rest/ConnectRestServer.java    |  69 +++
 .../connect/runtime/rest/HerderRequestHandler.java | 142 +++++++
 .../kafka/connect/runtime/rest/RestClient.java     |   7 +-
 .../kafka/connect/runtime/rest/RestServer.java     | 120 ++++--
 .../connect/runtime/rest/RestServerConfig.java     | 463 +++++++++++++++++++++
 .../runtime/rest/resources/ConnectorsResource.java | 169 +-------
 .../rest/resources/InternalClusterResource.java    | 115 +++++
 .../rest/resources/InternalConnectResource.java    |  39 ++
 .../kafka/connect/runtime/rest/util/SSLUtils.java  |   8 +-
 .../integration/RestExtensionIntegrationTest.java  |   2 +-
 .../integration/RestForwardingIntegrationTest.java |  17 +-
 .../kafka/connect/runtime/WorkerConfigTest.java    | 135 ------
 .../runtime/distributed/DistributedHerderTest.java |   4 +-
 .../connect/runtime/isolation/PluginsTest.java     |   8 +-
 ...tServerTest.java => ConnectRestServerTest.java} | 156 ++++---
 .../connect/runtime/rest/RestServerConfigTest.java | 167 ++++++++
 .../rest/resources/ConnectorsResourceTest.java     | 152 +------
 .../resources/InternalConnectResourceTest.java     | 224 ++++++++++
 .../connect/runtime/rest/util/SSLUtilsTest.java    |  38 +-
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +-
 34 files changed, 2014 insertions(+), 852 deletions(-)

diff --git a/build.gradle b/build.gradle
index 92f2fd1970d..b0b7e7f91dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2865,6 +2865,17 @@ project(':connect:mirror') {
     implementation libs.argparse4j
     implementation libs.jacksonAnnotations
     implementation libs.slf4jApi
+    implementation libs.jacksonAnnotations
+    implementation libs.jacksonJaxrsJsonProvider
+    implementation libs.jerseyContainerServlet
+    implementation libs.jerseyHk2
+    implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9
+    implementation libs.activation // Jersey dependency that was available in the JDK before Java 9
+    implementation libs.jettyServer
+    implementation libs.jettyServlet
+    implementation libs.jettyServlets
+    implementation libs.jettyClient
+    implementation libs.swaggerAnnotations
 
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index aa9917dfe6e..d787d4cbb4b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -594,6 +594,9 @@
       <allow pkg="org.apache.kafka.connect.integration" />
       <allow pkg="org.apache.kafka.connect.mirror" />
       <allow pkg="kafka.server" />
+      <subpackage name="rest">
+        <allow pkg="javax.ws.rs" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="runtime">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index fefd97e90d1..74358a2b3a9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -138,7 +138,7 @@
     <suppress checks="ParameterNumber"
               files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
-              files="ConfigKeyInfo.java"/>
+              files="(ConfigKeyInfo|DistributedHerder).java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/>
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index ae39c6cd79a..d33d6b4f686 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.mirror.rest.MirrorRestServer;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.Worker;
@@ -26,6 +27,7 @@ import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -46,6 +48,9 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.ArgumentParsers;
 
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,11 +106,13 @@ public class MirrorMaker {
     private CountDownLatch stopLatch;
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private final ShutdownHook shutdownHook;
-    private final String advertisedBaseUrl;
+    private final String advertisedUrl;
     private final Time time;
     private final MirrorMakerConfig config;
     private final Set<String> clusters;
     private final Set<SourceAndTarget> herderPairs;
+    private final MirrorRestServer internalServer;
+    private final RestClient restClient;
 
     /**
      * @param config    MM2 configuration from mm2.properties file
@@ -117,7 +124,16 @@ public class MirrorMaker {
     public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
         log.debug("Kafka MirrorMaker instance created");
         this.time = time;
-        this.advertisedBaseUrl = "NOTUSED";
+        if (config.enableInternalRest()) {
+            this.restClient = new RestClient(config);
+            internalServer = new MirrorRestServer(config.originals(), restClient);
+            internalServer.initializeServer();
+            this.advertisedUrl = internalServer.advertisedUrl().toString();
+        } else {
+            internalServer = null;
+            restClient = null;
+            this.advertisedUrl = "NOTUSED";
+        }
         this.config = config;
         if (clusters != null && !clusters.isEmpty()) {
             this.clusters = new HashSet<>(clusters);
@@ -171,6 +187,10 @@ public class MirrorMaker {
                 startLatch.countDown();
             }
         }
+        if (internalServer != null) {
+            log.info("Initializing internal REST resources");
+            internalServer.initializeInternalResources(herders);
+        }
         log.info("Configuring connectors...");
         herderPairs.forEach(this::configureConnectors);
         log.info("Kafka MirrorMaker started");
@@ -180,6 +200,9 @@ public class MirrorMaker {
         boolean wasShuttingDown = shutdown.getAndSet(true);
         if (!wasShuttingDown) {
             log.info("Kafka MirrorMaker stopping");
+            if (internalServer != null) {
+                Utils.closeQuietly(internalServer::stop, "Internal REST server");
+            }
             for (Herder herder : herders.values()) {
                 try {
                     herder.stop();
@@ -204,11 +227,13 @@ public class MirrorMaker {
         Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
         herders.get(sourceAndTarget)
                 .putConnectorConfig(connectorClass.getSimpleName(), connectorProps, true, (e, x) -> {
-                    if (e instanceof NotLeaderException) {
-                        // No way to determine if the connector is a leader or not beforehand.
-                        log.info("Connector {} is a follower. Using existing configuration.", sourceAndTarget);
+                    if (e == null) {
+                        log.info("{} connector configured for {}.", connectorClass.getSimpleName(), sourceAndTarget);
+                    } else if (e instanceof NotLeaderException) {
+                        // No way to determine if the herder is a leader or not beforehand.
+                        log.info("This node is a follower for {}. Using existing connector configuration.", sourceAndTarget);
                     } else {
-                        log.info("Connector {} configured.", sourceAndTarget, e);
+                        log.error("Failed to configure {} connector for {}", connectorClass.getSimpleName(), sourceAndTarget, e);
                     }
                 });
     }
@@ -226,7 +251,14 @@ public class MirrorMaker {
     private void addHerder(SourceAndTarget sourceAndTarget) {
         log.info("creating herder for " + sourceAndTarget.toString());
         Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
-        String advertisedUrl = advertisedBaseUrl + "/" + sourceAndTarget.source();
+        List<String> restNamespace;
+        try {
+            String encodedSource = encodePath(sourceAndTarget.source());
+            String encodedTarget = encodePath(sourceAndTarget.target());
+            restNamespace = Arrays.asList(encodedSource, encodedTarget);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("Unable to create encoded URL paths for source and target using UTF-8", e);
+        }
         String workerId = sourceAndTarget.toString();
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
@@ -255,13 +287,26 @@ public class MirrorMaker {
         // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
         // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
         // tracking the various shared admin objects in this class.
-        // Do not provide a restClient to the DistributedHerder to indicate that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, clientConfigOverridePolicy, sharedAdmin);
+                advertisedUrl, restClient, clientConfigOverridePolicy,
+                restNamespace, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
+    private static String encodePath(String rawPath) throws UnsupportedEncodingException {
+        return URLEncoder.encode(rawPath, StandardCharsets.UTF_8.name())
+                // Java's out-of-the-box URL encoder encodes spaces (' ') as pluses ('+'),
+                // and pluses as '%2B'
+                // But Jetty doesn't decode pluses at all and leaves them as-are in decoded
+                // URLs
+                // So to get around that, we replace pluses in the encoded URL here with '%20',
+                // which is the encoding that Jetty expects for spaces
+                // Jetty will reverse this transformation when evaluating the path parameters
+                // and will return decoded strings with all special characters as they were.
+                .replaceAll("\\+", "%20");
+    }
+
     private class ShutdownHook extends Thread {
         @Override
         public void run() {
@@ -300,7 +345,7 @@ public class MirrorMaker {
 
             Properties props = Utils.loadProps(configFile.getPath());
             Map<String, String> config = Utils.propsToStringMap(props);
-            MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);
+            MirrorMaker mirrorMaker = new MirrorMaker(config, clusters);
             
             try {
                 mirrorMaker.start();
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 19b4e4c4f77..85f4e9d79e4 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 
 import java.util.Map;
 import java.util.HashMap;
@@ -81,11 +82,14 @@ public class MirrorMakerConfig extends AbstractConfig {
     static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
     static final String SOURCE_PREFIX = "source.";
     static final String TARGET_PREFIX = "target.";
+    static final String ENABLE_INTERNAL_REST_CONFIG = "dedicated.mode.enable.internal.rest";
+    private static final String ENABLE_INTERNAL_REST_DOC =
+            "Whether to bring up an internal-only REST server that allows multi-node clusters to operate correctly.";
 
     private final Plugins plugins;
    
     public MirrorMakerConfig(Map<?, ?> props) {
-        super(CONFIG_DEF, props, true);
+        super(config(), props, true);
         plugins = new Plugins(originalsStrings());
     }
 
@@ -93,6 +97,10 @@ public class MirrorMakerConfig extends AbstractConfig {
         return new HashSet<>(getList(CLUSTERS_CONFIG));
     }
 
+    public boolean enableInternalRest() {
+        return getBoolean(ENABLE_INTERNAL_REST_CONFIG);
+    }
+
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
@@ -272,20 +280,25 @@ public class MirrorMakerConfig extends AbstractConfig {
         providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
         return transformed;
     }
- 
-    protected static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
-            .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
-            // security support
-            .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
-                Type.STRING,
-                CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
-                in(Utils.enumOptions(SecurityProtocol.class)),
-                Importance.MEDIUM,
-                CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-            .withClientSslSupport()
-            .withClientSaslSupport();
 
+    protected static ConfigDef config() {
+        ConfigDef result = new ConfigDef()
+                .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
+                .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
+                .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
+                // security support
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                        Type.STRING,
+                        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                        in(Utils.enumOptions(SecurityProtocol.class)),
+                        Importance.MEDIUM,
+                        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .withClientSslSupport()
+                .withClientSaslSupport();
+        RestServerConfig.addInternalConfig(result);
+        return result;
+    }
+ 
     private Map<String, String> stringsWithPrefixStripped(String prefix) {
         return originalsStrings().entrySet().stream()
             .filter(x -> x.getKey().startsWith(prefix))
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
new file mode 100644
index 00000000000..7f1fe2841a3
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.mirror.rest.resources.InternalMirrorResource;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public class MirrorRestServer extends RestServer {
+
+    private final RestClient restClient;
+    private Map<SourceAndTarget, Herder> herders;
+
+    public MirrorRestServer(Map<?, ?> props, RestClient restClient) {
+        super(RestServerConfig.forInternal(props));
+        this.restClient = restClient;
+    }
+
+    public void initializeInternalResources(Map<SourceAndTarget, Herder> herders) {
+        this.herders = herders;
+        super.initializeResources();
+    }
+
+    @Override
+    protected Collection<ConnectResource> regularResources() {
+        return Arrays.asList(
+                new InternalMirrorResource(herders, restClient)
+        );
+    }
+
+    @Override
+    protected Collection<ConnectResource> adminResources() {
+        return Collections.emptyList();
+    }
+
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
new file mode 100644
index 00000000000..8b5150f56ac
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.rest.resources;
+
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.UriInfo;
+import java.util.Map;
+
+@Path("/{source}/{target}/connectors")
+public class InternalMirrorResource extends InternalClusterResource {
+
+    @Context
+    private UriInfo uriInfo;
+
+    private static final Logger log = LoggerFactory.getLogger(InternalMirrorResource.class);
+
+    private final Map<SourceAndTarget, Herder> herders;
+
+    public InternalMirrorResource(Map<SourceAndTarget, Herder> herders, RestClient restClient) {
+        super(restClient);
+        this.herders = herders;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        String source = pathParam("source");
+        String target = pathParam("target");
+        Herder result = herders.get(new SourceAndTarget(source, target));
+        if (result == null) {
+            throw new NotFoundException("No replication flow found for source '" + source + "' and target '" + target + "'");
+        }
+        return result;
+    }
+
+    private String pathParam(String name) {
+        String result = uriInfo.getPathParameters().getFirst(name);
+        if (result == null)
+            throw new NotFoundException("Could not parse " + name + " cluster from request path");
+        return result;
+    }
+
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
new file mode 100644
index 00000000000..f78de9bced6
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+    private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+    private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+    private Map<String, EmbeddedKafkaCluster> kafkaClusters;
+    private Map<String, MirrorMaker> mirrorMakers;
+
+    @BeforeEach
+    public void setup() {
+        kafkaClusters = new HashMap<>();
+        mirrorMakers = new HashMap<>();
+    }
+
+    @AfterEach
+    public void teardown() throws Throwable {
+        AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
+        mirrorMakers.forEach((name, mirrorMaker) ->
+            Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + name + "'", shutdownFailure)
+        );
+        kafkaClusters.forEach((name, kafkaCluster) ->
+            Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" + name + "'", shutdownFailure)
+        );
+        if (shutdownFailure.get() != null) {
+            throw shutdownFailure.get();
+        }
+    }
+
+    private EmbeddedKafkaCluster startKafkaCluster(String name, int numBrokers, Properties brokerProperties) {
+        if (kafkaClusters.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple Kafka clusters with the same name");
+
+        EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, brokerProperties);
+        kafkaClusters.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    private MirrorMaker startMirrorMaker(String name, Map<String, String> mmProps) {
+        if (mirrorMakers.containsKey(name))
+            throw new IllegalStateException("Cannot register multiple MirrorMaker nodes with the same name");
+
+        MirrorMaker result = new MirrorMaker(mmProps);
+        mirrorMakers.put(name, result);
+
+        result.start();
+
+        return result;
+    }
+
+    /**
+     * Tests a single-node cluster without the REST server enabled.
+     */
+    @Test
+    public void testSingleNodeCluster() throws Exception {
+        Properties brokerProps = new Properties();
+        EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+        EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+        clusterA.start();
+        clusterB.start();
+
+        try (Admin adminA = clusterA.createAdminClient();
+             Admin adminB = clusterB.createAdminClient()) {
+
+            // Cluster aliases
+            final String a = "A";
+            final String b = "B";
+            final String ab = a + "->" + b;
+            final String ba = b + "->" + a;
+            final String testTopicPrefix = "test-topic-";
+
+            Map<String, String> mmProps = new HashMap<String, String>() {{
+                    put("dedicated.mode.enable.internal.rest", "false");
+                    put("listeners", "http://localhost:0");
+                    // Refresh topics very frequently to quickly pick up on topics that are created
+                    // after the MM2 nodes are brought up during testing
+                    put("refresh.topics.interval.seconds", "1");
+                    put("clusters", String.join(", ", a, b));
+                    put(a + ".bootstrap.servers", clusterA.bootstrapServers());
+                    put(b + ".bootstrap.servers", clusterB.bootstrapServers());
+                    put(ab + ".enabled", "true");
+                    put(ab + ".topics", "^" + testTopicPrefix + ".*");
+                    put(ba + ".enabled", "false");
+                    put(ba + ".emit.heartbeats.enabled", "false");
+                    put("replication.factor", "1");
+                    put("checkpoints.topic.replication.factor", "1");
+                    put("heartbeats.topic.replication.factor", "1");
+                    put("offset-syncs.topic.replication.factor", "1");
+                    put("offset.storage.replication.factor", "1");
+                    put("status.storage.replication.factor", "1");
+                    put("config.storage.replication.factor", "1");
+                }};
+
+            // Bring up a single-node cluster
+            startMirrorMaker("single node", mmProps);
+
+            final int numMessages = 10;
+            String topic = testTopicPrefix + "1";
+
+            // Create the topic on cluster A
+            createTopic(adminA, topic);
+            // and wait for MirrorMaker to create it on cluster B
+            awaitTopicCreation(b, adminB, a + "." + topic);
+
+            // Write data to the topic on cluster A
+            writeToTopic(clusterA, topic, numMessages);
+            // and wait for MirrorMaker to copy it to cluster B
+            awaitTopicContent(clusterB, b, a + "." + topic, numMessages);
+        }
+    }
+
+    /**
+     * Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime
+     * and reconfigure its connectors and their tasks to replicate those topics correctly.
+     * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters">KIP-710</a>
+     * for more detail on the necessity for this test case.
+     */
+    @Test
+    public void testMultiNodeCluster() throws Exception {
+        Properties brokerProps = new Properties();
+        brokerProps.put("transaction.state.log.replication.factor", "1");
+        brokerProps.put("transaction.state.log.min.isr", "1");
+        EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
+        EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
+
+        clusterA.start();
+        clusterB.start();
+
+        try (Admin adminA = clusterA.createAdminClient();
+                Admin adminB = clusterB.createAdminClient()) {
+
+            // Cluster aliases
+            final String a = "A";
+            // Use a convoluted cluster name to ensure URL encoding/decoding works
+            final String b = "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618";
+            final String ab = a + "->" + b;
+            final String ba = b + "->" + a;
+            final String testTopicPrefix = "test-topic-";
+
+            Map<String, String> mmProps = new HashMap<String, String>() {{
+                    put("dedicated.mode.enable.internal.rest", "true");
+                    put("listeners", "http://localhost:0");
+                    // Refresh topics very frequently to quickly pick up on topics that are created
+                    // after the MM2 nodes are brought up during testing
+                    put("refresh.topics.interval.seconds", "1");
+                    put("clusters", String.join(", ", a, b));
+                    put(a + ".bootstrap.servers", clusterA.bootstrapServers());
+                    put(b + ".bootstrap.servers", clusterB.bootstrapServers());
+                    // Enable exactly-once support to both validate that MirrorMaker can run with
+                    // that feature turned on, and to force cross-worker communication before
+                    // task startup
+                    put(a + ".exactly.once.source.support", "enabled");
+                    put(ab + ".enabled", "true");
+                    put(ab + ".topics", "^" + testTopicPrefix + ".*");
+                    // The name of the offset syncs topic will contain the name of the cluster in
+                    // the replication flow that it is _not_ hosted on; create the offset syncs topic
+                    // on the target cluster so that its name will contain the source cluster's name
+                    // (since the target cluster's name contains characters that are not valid for
+                    // use in a topic name)
+                    put(ab + ".offset-syncs.topic.location", "target");
+                    // Disable b -> a (and heartbeats from it) so that no topics are created that use
+                    // the target cluster's name
+                    put(ba + ".enabled", "false");
+                    put(ba + ".emit.heartbeats.enabled", "false");
+                    put("replication.factor", "1");
+                    put("checkpoints.topic.replication.factor", "1");
+                    put("heartbeats.topic.replication.factor", "1");
+                    put("offset-syncs.topic.replication.factor", "1");
+                    put("offset.storage.replication.factor", "1");
+                    put("status.storage.replication.factor", "1");
+                    put("config.storage.replication.factor", "1");
+                }};
+
+            // Bring up a three-node cluster
+            final int numNodes = 3;
+            for (int i = 0; i < numNodes; i++) {
+                startMirrorMaker("node " + i, mmProps);
+            }
+
+            // Create one topic per Kafka cluster per MirrorMaker node
+            final int topicsPerCluster = numNodes;
+            final int messagesPerTopic = 10;
+            for (int i = 0; i < topicsPerCluster; i++) {
+                String topic = testTopicPrefix + i;
+
+                // Create the topic on cluster A
+                createTopic(adminA, topic);
+                // and wait for MirrorMaker to create it on cluster B
+                awaitTopicCreation(b, adminB, a + "." + topic);
+
+                // Write data to the topic on cluster A
+                writeToTopic(clusterA, topic, messagesPerTopic);
+                // and wait for MirrorMaker to copy it to cluster B
+                awaitTopicContent(clusterB, b, a + "." + topic, messagesPerTopic);
+            }
+        }
+    }
+
+    private void createTopic(Admin admin, String name) throws Exception {
+        admin.createTopics(Collections.singleton(new NewTopic(name, 1, (short) 1))).all().get();
+    }
+
+    private void awaitTopicCreation(String clusterName, Admin admin, String topic) throws Exception {
+        waitForCondition(
+                () -> {
+                    try {
+                        Set<String> allTopics = admin.listTopics().names().get();
+                        return allTopics.contains(topic);
+                    } catch (Exception e) {
+                        log.debug("Failed to check for existence of topic {} on cluster {}", topic, clusterName, e);
+                        return false;
+                    }
+                },
+                TOPIC_CREATION_TIMEOUT_MS,
+                "topic " + topic + " was not created on cluster " + clusterName + " in time"
+        );
+    }
+
+    private void writeToTopic(EmbeddedKafkaCluster cluster, String topic, int numMessages) {
+        for (int i = 0; i <= numMessages; i++) {
+            cluster.produce(topic, Integer.toString(i));
+        }
+    }
+
+    private void awaitTopicContent(EmbeddedKafkaCluster cluster, String clusterName, String topic, int numMessages) throws Exception {
+        try (Consumer<?, ?> consumer = cluster.createConsumer(Collections.singletonMap(AUTO_OFFSET_RESET_CONFIG, "earliest"))) {
+            consumer.subscribe(Collections.singleton(topic));
+            AtomicInteger messagesRead = new AtomicInteger(0);
+            waitForCondition(
+                    () -> {
+                        ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(1));
+                        return messagesRead.addAndGet(records.count()) >= numMessages;
+                    },
+                    TOPIC_REPLICATION_TIMEOUT_MS,
+                    () -> "could not read " + numMessages + " from topic " + topic + " on cluster " + clusterName + " in time; only read " + messagesRead.get()
+            );
+        }
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 8831081bf44..de666c7bd60 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.slf4j.Logger;
@@ -124,7 +125,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> {
 
         RestClient restClient = new RestClient(config);
 
-        RestServer restServer = new RestServer(config, restClient);
+        ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, workerProps);
         restServer.initializeServer();
 
         URI advertisedUrl = restServer.advertisedUrl();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 1da9fa40622..a887a5bccd8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -37,6 +37,7 @@ import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -97,7 +98,8 @@ public class ConnectDistributed extends AbstractConnectCli<DistributedConfig> {
         // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
         return new DistributedHerder(config, Time.SYSTEM, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy, sharedAdmin);
+                restServer.advertisedUrl().toString(), restClient, connectorClientConfigOverridePolicy,
+                Collections.emptyList(), sharedAdmin);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index e5ab246c0bd..e16e3bd72a1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,13 +33,13 @@ public class Connect {
     private static final Logger log = LoggerFactory.getLogger(Connect.class);
 
     private final Herder herder;
-    private final RestServer rest;
+    private final ConnectRestServer rest;
     private final CountDownLatch startLatch = new CountDownLatch(1);
     private final CountDownLatch stopLatch = new CountDownLatch(1);
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private final ShutdownHook shutdownHook;
 
-    public Connect(Herder herder, RestServer rest) {
+    public Connect(Herder herder, ConnectRestServer rest) {
         log.debug("Kafka Connect instance created");
         this.herder = herder;
         this.rest = rest;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 388803da946..21386fc2338 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -24,19 +24,15 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.SslClientAuth;
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,8 +40,6 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
-import org.eclipse.jetty.util.StringUtil;
-
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
@@ -57,9 +51,6 @@ public class WorkerConfig extends AbstractConfig {
     private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);
 
     private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
-    private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
-            Arrays.asList("set", "add", "setDate", "addDate")
-    );
 
     public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
     public static final String BOOTSTRAP_SERVERS_DOC
@@ -119,47 +110,6 @@ public class WorkerConfig extends AbstractConfig {
             + "running with exactly-once support.";
     public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
 
-    public static final String LISTENERS_CONFIG = "listeners";
-    private static final String LISTENERS_DOC
-            = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
-            " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
-            " Leave hostname empty to bind to default interface.\n" +
-            " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
-    static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083");
-
-    public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
-    private static final String REST_ADVERTISED_HOST_NAME_DOC
-            = "If this is set, this is the hostname that will be given out to other workers to connect to.";
-
-    public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
-    private static final String REST_ADVERTISED_PORT_DOC
-            = "If this is set, this is the port that will be given out to other workers to connect to.";
-
-    public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener";
-    private static final String REST_ADVERTISED_LISTENER_DOC
-            = "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use.";
-
-    public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
-    protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
-            "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
-                    "To enable cross origin access, set this to the domain of the application that should be permitted" +
-                    " to access the API, or '*' to allow access from any domain. The default value only allows access" +
-                    " from the domain of the REST API.";
-    protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
-
-    public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods";
-    protected static final String ACCESS_CONTROL_ALLOW_METHODS_DOC =
-        "Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. "
-        + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
-    protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
-
-    public static final String ADMIN_LISTENERS_CONFIG = "admin.listeners";
-    protected static final String ADMIN_LISTENERS_DOC = "List of comma-separated URIs the Admin REST API will listen on." +
-            " The supported protocols are HTTP and HTTPS." +
-            " An empty or blank string will disable this feature." +
-            " The default behavior is to use the regular listener (specified by the 'listeners' property).";
-    public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https.";
-
     public static final String PLUGIN_PATH_CONFIG = "plugin.path";
     protected static final String PLUGIN_PATH_DOC = "List of paths separated by commas (,) that "
             + "contain plugins (connectors, converters, transformations). The list should consist"
@@ -182,13 +132,6 @@ public class WorkerConfig extends AbstractConfig {
             + "<code>ConfigProvider</code> allows you to replace variable references in connector configurations, "
             + "such as for externalized secrets. ";
 
-    public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
-    protected static final String REST_EXTENSION_CLASSES_DOC =
-            "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
-            + "in the order specified. Implementing the interface  "
-            + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
-            + "Typically used to add custom capability like logging, security, etc. ";
-
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = "connector.client.config.override.policy";
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
         "Class name or alias of implementation of <code>ConnectorClientConfigOverridePolicy</code>. Defines what client configurations can be "
@@ -227,17 +170,13 @@ public class WorkerConfig extends AbstractConfig {
             + "to create topics automatically.";
     protected static final boolean TOPIC_CREATION_ENABLE_DEFAULT = true;
 
-    public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
-    protected static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
-    protected static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
-
     /**
      * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
      * bootstrap their own ConfigDef.
      * @return a ConfigDef with all the common options specified
      */
     protected static ConfigDef baseConfigDef() {
-        return new ConfigDef()
+        ConfigDef result = new ConfigDef()
                 .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
                         Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
                 .define(CLIENT_DNS_LOOKUP_CONFIG,
@@ -258,16 +197,6 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
                 .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
                         Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
-                .define(LISTENERS_CONFIG, Type.LIST, LISTENERS_DEFAULT, new ListenersValidator(), Importance.LOW, LISTENERS_DOC)
-                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
-                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
-                .define(REST_ADVERTISED_LISTENER_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_LISTENER_DOC)
-                .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
-                        ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
-                        ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
-                .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
-                        ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
-                        ACCESS_CONTROL_ALLOW_METHODS_DOC)
                 .define(PLUGIN_PATH_CONFIG,
                         Type.LIST,
                         null,
@@ -292,30 +221,37 @@ public class WorkerConfig extends AbstractConfig {
                         true,
                         Importance.LOW,
                         CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
-                .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
-                        ConfigDef.Type.STRING, SslClientAuth.NONE.toString(), in(Utils.enumOptions(SslClientAuth.class)), ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
                         Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
                 .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
-                .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
-                .define(ADMIN_LISTENERS_CONFIG, Type.LIST, null,
-                        new AdminListenersValidator(), Importance.LOW, ADMIN_LISTENERS_DOC)
                 .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
                         Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
-                .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
-                        Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
-                .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
-                        Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC)
                 .define(TOPIC_CREATION_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_CREATION_ENABLE_DEFAULT, Importance.LOW,
                         TOPIC_CREATION_ENABLE_DOC)
-                .define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT,
-                        new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC)
                 // security support
                 .withClientSslSupport();
+        addTopicTrackingConfig(result);
+        RestServerConfig.addPublicConfig(result);
+        return result;
+    }
+
+    public static void addTopicTrackingConfig(ConfigDef configDef) {
+        configDef
+                .define(
+                        TOPIC_TRACKING_ENABLE_CONFIG,
+                        ConfigDef.Type.BOOLEAN,
+                        TOPIC_TRACKING_ENABLE_DEFAULT,
+                        ConfigDef.Importance.LOW,
+                        TOPIC_TRACKING_ENABLE_DOC
+                ).define(
+                        TOPIC_TRACKING_ALLOW_RESET_CONFIG,
+                        ConfigDef.Type.BOOLEAN,
+                        TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
+                        ConfigDef.Importance.LOW,
+                        TOPIC_TRACKING_ALLOW_RESET_DOC);
     }
 
     private String kafkaClusterId;
@@ -396,7 +332,7 @@ public class WorkerConfig extends AbstractConfig {
         return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
     }
 
-    public Integer getRebalanceTimeout() {
+    public Integer rebalanceTimeout() {
         return null;
     }
 
@@ -477,125 +413,4 @@ public class WorkerConfig extends AbstractConfig {
         logPluginPathConfigProviderWarning(props);
     }
 
-    // Visible for testing
-    static void validateHttpResponseHeaderConfig(String config) {
-        try {
-            // validate format
-            String[] configTokens = config.trim().split("\\s+", 2);
-            if (configTokens.length != 2) {
-                throw new ConfigException(String.format("Invalid format of header config '%s'. "
-                        + "Expected: '[action] [header name]:[header value]'", config));
-            }
-
-            // validate action
-            String method = configTokens[0].trim();
-            validateHeaderConfigAction(method);
-
-            // validate header name and header value pair
-            String header = configTokens[1];
-            String[] headerTokens = header.trim().split(":");
-            if (headerTokens.length != 2) {
-                throw new ConfigException(
-                        String.format("Invalid format of header name and header value pair '%s'. "
-                                + "Expected: '[header name]:[header value]'", header));
-            }
-
-            // validate header name
-            String headerName = headerTokens[0].trim();
-            if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
-                throw new ConfigException(String.format("Invalid header name '%s'. "
-                        + "The '[header name]' cannot contain whitespace", headerName));
-            }
-        } catch (ArrayIndexOutOfBoundsException e) {
-            throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
-        }
-    }
-
-    // Visible for testing
-    static void validateHeaderConfigAction(String action) {
-        if (HEADER_ACTIONS.stream().noneMatch(action::equalsIgnoreCase)) {
-            throw new ConfigException(String.format("Invalid header config action: '%s'. "
-                    + "Expected one of %s", action, HEADER_ACTIONS));
-        }
-    }
-
-    private static class ListenersValidator implements ConfigDef.Validator {
-        @Override
-        public void ensureValid(String name, Object value) {
-            if (!(value instanceof List)) {
-                throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443).");
-            }
-
-            List<?> items = (List<?>) value;
-            if (items.isEmpty()) {
-                throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443.");
-            }
-
-            for (Object item : items) {
-                if (!(item instanceof String)) {
-                    throw new ConfigException("Invalid type for listeners (expected String).");
-                }
-                if (Utils.isBlank((String) item)) {
-                    throw new ConfigException("Empty URL found when parsing listeners list.");
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
-        }
-    }
-
-    private static class AdminListenersValidator implements ConfigDef.Validator {
-        @Override
-        public void ensureValid(String name, Object value) {
-            if (value == null) {
-                return;
-            }
-
-            if (!(value instanceof List)) {
-                throw new ConfigException("Invalid value type for admin.listeners (expected list).");
-            }
-
-            List<?> items = (List<?>) value;
-            if (items.isEmpty()) {
-                return;
-            }
-
-            for (Object item : items) {
-                if (!(item instanceof String)) {
-                    throw new ConfigException("Invalid type for admin.listeners (expected String).");
-                }
-                if (Utils.isBlank((String) item)) {
-                    throw new ConfigException("Empty URL found when parsing admin.listeners list.");
-                }
-            }
-        }
-
-        @Override
-        public String toString() {
-            return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
-        }
-    }
-
-    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
-        @Override
-        public void ensureValid(String name, Object value) {
-            String strValue = (String) value;
-            if (Utils.isBlank(strValue)) {
-                return;
-            }
-
-            String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
-            Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig);
-        }
-
-        @Override
-        public String toString() {
-            return "Comma-separated header rules, where each header rule is of the form "
-                    + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
-                    + "if any part of a header rule contains a comma";
-        }
-    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 1acdae79f80..9cacb6e1ee6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -509,7 +509,7 @@ public class DistributedConfig extends WorkerConfig {
     private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
 
     @Override
-    public Integer getRebalanceTimeout() {
+    public Integer rebalanceTimeout() {
         return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 6f7a372f413..b68cc443ea7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -199,6 +199,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private Set<String> connectorTargetStateChanges = new HashSet<>();
     // Access to this map is protected by the herder's monitor
     private final Map<String, ZombieFencing> activeZombieFencings = new HashMap<>();
+    private final List<String> restNamespace;
     private boolean needsReconfigRebalance;
     private volatile boolean fencedFromConfigTopic;
     private volatile int generation;
@@ -228,9 +229,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
      * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
      * @param statusBackingStore the backing store for statuses; may not be null
      * @param configBackingStore the backing store for connector configurations; may not be null
-     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null, but may be an arbitrary placeholder
+     *                           value if this worker does not expose a REST API
+     * @param restClient         a REST client that can be used to issue requests to other workers in the cluster; may
+     *                           be null if inter-worker communication is not enabled
      * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
      *                                            in connector configurations; may not be null
+     * @param restNamespace      zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null
      * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
      *                           after all services and resources owned by this herder are stopped
      */
@@ -243,10 +248,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                              String restUrl,
                              RestClient restClient,
                              ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             List<String> restNamespace,
                              AutoCloseable... uponShutdown) {
-        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore,
-             null, restUrl, restClient, worker.metrics(),
-             time, connectorClientConfigOverridePolicy, uponShutdown);
+        this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, restClient, worker.metrics(),
+                time, connectorClientConfigOverridePolicy, restNamespace, uponShutdown);
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
@@ -263,6 +268,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       ConnectMetrics metrics,
                       Time time,
                       ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                      List<String> restNamespace,
                       AutoCloseable... uponShutdown) {
         super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
 
@@ -277,6 +283,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         this.keyGenerator = config.getInternalRequestKeyGenerator();
         this.restClient = restClient;
         this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.restNamespace = Objects.requireNonNull(restNamespace);
         this.uponShutdown = Arrays.asList(uponShutdown);
 
         String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
@@ -1162,11 +1169,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 callback.onCompletion(null, null);
             } else if (error instanceof NotLeaderException) {
                 if (restClient != null) {
-                    String forwardedUrl = ((NotLeaderException) error).forwardUrl() + "connectors/" + id.connector() + "/fence";
-                    log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), forwardedUrl);
+                    String workerUrl = ((NotLeaderException) error).forwardUrl();
+                    String fenceUrl = namespacedUrl(workerUrl)
+                            .path("connectors")
+                            .path(id.connector())
+                            .path("fence")
+                            .build()
+                            .toString();
+                    log.trace("Forwarding zombie fencing request for connector {} to leader at {}", id.connector(), fenceUrl);
                     forwardRequestExecutor.execute(() -> {
                         try {
-                            restClient.httpRequest(forwardedUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
+                            restClient.httpRequest(fenceUrl, "PUT", null, null, null, sessionKey, requestSignatureAlgorithm);
                             callback.onCompletion(null, null);
                         } catch (Throwable t) {
                             callback.onCompletion(t, null);
@@ -1175,12 +1188,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 } else {
                     callback.onCompletion(
                             new ConnectException(
-                                    // TODO: Update this message if KIP-710 is accepted and merged
-                                    //       (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                                     "This worker is not able to communicate with the leader of the cluster, "
                                             + "which is required for exactly-once source tasks. If running MirrorMaker 2 "
-                                            + "in dedicated mode, consider either disabling exactly-once support, or deploying "
-                                            + "the connectors for MirrorMaker 2 directly onto a distributed Kafka Connect cluster."
+                                            + "in dedicated mode, consider enabling inter-worker communication via the "
+                                            + "'dedicated.mode.enable.internal.rest' property."
                             ),
                             null
                     );
@@ -1936,12 +1947,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                     writeToConfigTopicAsLeader(() -> configBackingStore.putTaskConfigs(connName, rawTaskProps));
                     cb.onCompletion(null, null);
                 } else if (restClient == null) {
-                    // TODO: Update this message if KIP-710 is accepted and merged
-                    //       (https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters)
                     throw new NotLeaderException("This worker is not able to communicate with the leader of the cluster, "
                             + "which is required for dynamically-reconfiguring connectors. If running MirrorMaker 2 "
-                            + "in dedicated mode, consider deploying the connectors for MirrorMaker 2 directly onto a "
-                            + "distributed Kafka Connect cluster.",
+                            + "in dedicated mode, consider enabling inter-worker communication via the "
+                            + "'dedicated.mode.enable.internal.rest' property.",
                             leaderUrl()
                     );
                 } else {
@@ -1956,7 +1965,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                                         "because the URL of the leader's REST interface is empty!"), null);
                                 return;
                             }
-                            String reconfigUrl = UriBuilder.fromUri(leaderUrl)
+                            String reconfigUrl = namespacedUrl(leaderUrl)
                                     .path("connectors")
                                     .path(connName)
                                     .path("tasks")
@@ -1965,6 +1974,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                             log.trace("Forwarding task configurations for connector {} to leader", connName);
                             restClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, sessionKey, requestSignatureAlgorithm);
                             cb.onCompletion(null, null);
+                            log.trace("Request to leader to reconfigure connector tasks succeeded");
                         } catch (ConnectException e) {
                             log.error("Request to leader to reconfigure connector tasks failed", e);
                             cb.onCompletion(e, null);
@@ -2460,6 +2470,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return false;
     }
 
+    private UriBuilder namespacedUrl(String workerUrl) {
+        UriBuilder result = UriBuilder.fromUri(workerUrl);
+        for (String namespacePath : restNamespace) {
+            result = result.path(namespacePath);
+        }
+        return result;
+    }
+
     /**
      * Represents an active zombie fencing: that is, an in-progress attempt to invoke
      * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
new file mode 100644
index 00000000000..ca9eb731c0c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
+import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
+import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
+import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.glassfish.jersey.server.ResourceConfig;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+public class ConnectRestServer extends RestServer {
+
+    private final RestClient restClient;
+    private Herder herder;
+
+    public ConnectRestServer(Integer rebalanceTimeoutMs, RestClient restClient, Map<?, ?> props) {
+        super(RestServerConfig.forPublic(rebalanceTimeoutMs, props));
+        this.restClient = restClient;
+    }
+
+    public void initializeResources(Herder herder) {
+        this.herder = herder;
+        super.initializeResources();
+    }
+
+    @Override
+    protected Collection<ConnectResource> regularResources() {
+        return Arrays.asList(
+                new RootResource(herder),
+                new ConnectorsResource(herder, config, restClient),
+                new InternalConnectResource(herder, restClient),
+                new ConnectorPluginsResource(herder)
+        );
+    }
+
+    @Override
+    protected Collection<ConnectResource> adminResources() {
+        return Arrays.asList(
+                new LoggingResource()
+        );
+    }
+
+    @Override
+    protected void configureRegularResources(ResourceConfig resourceConfig) {
+        registerRestExtensions(herder, resourceConfig);
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
new file mode 100644
index 00000000000..1f71070047c
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
+import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.FutureCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class HerderRequestHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(HerderRequestHandler.class);
+
+    private final RestClient restClient;
+
+    private long requestTimeoutMs;
+
+    public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
+        this.restClient = restClient;
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+    public void requestTimeoutMs(long requestTimeoutMs) {
+        if (requestTimeoutMs < 1) {
+            throw new IllegalArgumentException("REST request timeout must be positive");
+        }
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+    /**
+     * Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
+     * request to the leader.
+      */
+    public <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
+                                                    String path,
+                                                    String method,
+                                                    HttpHeaders headers,
+                                                    Map<String, String> queryParameters,
+                                                    Object body,
+                                                    TypeReference<U> resultType,
+                                                    Translator<T, U> translator,
+                                                    Boolean forward) throws Throwable {
+        try {
+            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+
+            if (cause instanceof RequestTargetException) {
+                if (forward == null || forward) {
+                    // the only time we allow recursive forwarding is when no forward flag has
+                    // been set, which should only be seen by the first worker to handle a user request.
+                    // this gives two total hops to resolve the request before giving up.
+                    boolean recursiveForward = forward == null;
+                    RequestTargetException targetException = (RequestTargetException) cause;
+                    String forwardedUrl = targetException.forwardUrl();
+                    if (forwardedUrl == null) {
+                        // the target didn't know of the leader at this moment.
+                        throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                                "Cannot complete request momentarily due to no known leader URL, "
+                                        + "likely because a rebalance was underway.");
+                    }
+                    UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl)
+                            .path(path)
+                            .queryParam("forward", recursiveForward);
+                    if (queryParameters != null) {
+                        queryParameters.forEach(uriBuilder::queryParam);
+                    }
+                    String forwardUrl = uriBuilder.build().toString();
+                    log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
+                    return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType));
+                } else {
+                    // we should find the right target for the query within two hops, so if
+                    // we don't, it probably means that a rebalance has taken place.
+                    throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                            "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
+                }
+            } else if (cause instanceof RebalanceNeededException) {
+                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
+                        "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
+            }
+
+            throw cause;
+        } catch (TimeoutException e) {
+            // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
+            // error is the best option
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
+        } catch (InterruptedException e) {
+            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
+        }
+    }
+
+    public <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
+                                             TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
+    }
+
+    public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
+                                                 TypeReference<T> resultType, Boolean forward) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<>(), forward);
+    }
+
+    public <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
+                                                 Object body, Boolean forward) throws Throwable {
+        return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<>(), forward);
+    }
+
+    public interface Translator<T, U> {
+        T translate(RestClient.HttpResponse<U> response);
+    }
+
+    public static class IdentityTranslator<T> implements Translator<T, T> {
+        @Override
+        public T translate(RestClient.HttpResponse<T> response) {
+            return response.body();
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 16ea02ebbdd..d9ba6194124 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,8 +19,8 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.Crypto;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@@ -51,9 +51,10 @@ import java.util.concurrent.TimeoutException;
 public class RestClient {
     private static final Logger log = LoggerFactory.getLogger(RestClient.class);
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
-    private WorkerConfig config;
 
-    public RestClient(WorkerConfig config) {
+    private final AbstractConfig config;
+
+    public RestClient(AbstractConfig config) {
         this.config = config;
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 280c3609ba7..e74000ff137 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -24,15 +24,10 @@ import org.apache.kafka.connect.health.ConnectClusterDetails;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
-import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
-import org.apache.kafka.connect.runtime.rest.resources.RootResource;
 import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.CustomRequestLog;
@@ -67,12 +62,10 @@ import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX;
-
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
-public class RestServer {
+public abstract class RestServer {
     private static final Logger log = LoggerFactory.getLogger(RestServer.class);
 
     // Used to distinguish between Admin connectors and regular REST API connectors when binding admin handlers
@@ -84,9 +77,7 @@ public class RestServer {
     private static final String PROTOCOL_HTTP = "http";
     private static final String PROTOCOL_HTTPS = "https";
 
-    private final WorkerConfig config;
-
-    private final RestClient restClient;
+    protected final RestServerConfig config;
     private final ContextHandlerCollection handlers;
     private final Server jettyServer;
 
@@ -96,12 +87,11 @@ public class RestServer {
     /**
      * Create a REST server for this herder using the specified configs.
      */
-    public RestServer(WorkerConfig config, RestClient restClient) {
+    protected RestServer(RestServerConfig config) {
         this.config = config;
-        this.restClient = restClient;
 
-        List<String> listeners = config.getList(WorkerConfig.LISTENERS_CONFIG);
-        List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+        List<String> listeners = config.listeners();
+        List<String> adminListeners = config.adminListeners();
 
         jettyServer = new Server();
         handlers = new ContextHandlerCollection();
@@ -161,7 +151,7 @@ public class RestServer {
         if (PROTOCOL_HTTPS.equals(protocol)) {
             SslContextFactory ssl;
             if (isAdmin) {
-                ssl = SSLUtils.createServerSideSslContextFactory(config, ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
+                ssl = SSLUtils.createServerSideSslContextFactory(config, RestServerConfig.ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX);
             } else {
                 ssl = SSLUtils.createServerSideSslContextFactory(config);
             }
@@ -210,44 +200,47 @@ public class RestServer {
         }
 
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
-        log.info("REST admin endpoints at " + adminUrl());
+        URI adminUrl = adminUrl();
+        if (adminUrl != null)
+            log.info("REST admin endpoints at " + adminUrl);
     }
 
-    public void initializeResources(Herder herder) {
+    protected final void initializeResources() {
         log.info("Initializing REST resources");
+        resources = new ArrayList<>();
 
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        this.resources = new ArrayList<>();
-        resources.add(new RootResource(herder));
-        resources.add(new ConnectorsResource(herder, config, restClient));
-        resources.add(new ConnectorPluginsResource(herder));
-        resources.forEach(resourceConfig::register);
+        Collection<ConnectResource> regularResources = regularResources();
+        regularResources.forEach(resourceConfig::register);
+        resources.addAll(regularResources);
 
         resourceConfig.register(ConnectExceptionMapper.class);
         resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
 
-        registerRestExtensions(herder, resourceConfig);
+        configureRegularResources(resourceConfig);
 
-        List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+        List<String> adminListeners = config.adminListeners();
         ResourceConfig adminResourceConfig;
         if (adminListeners == null) {
             log.info("Adding admin resources to main listener");
             adminResourceConfig = resourceConfig;
-            LoggingResource loggingResource = new LoggingResource();
-            this.resources.add(loggingResource);
-            adminResourceConfig.register(loggingResource);
+            Collection<ConnectResource> adminResources = adminResources();
+            resources.addAll(adminResources);
+            adminResources.forEach(adminResourceConfig::register);
+            configureAdminResources(adminResourceConfig);
         } else if (adminListeners.size() > 0) {
             // TODO: we need to check if these listeners are same as 'listeners'
             // TODO: the following code assumes that they are different
             log.info("Adding admin resources to admin listener");
             adminResourceConfig = new ResourceConfig();
             adminResourceConfig.register(new JacksonJsonProvider());
-            LoggingResource loggingResource = new LoggingResource();
-            this.resources.add(loggingResource);
-            adminResourceConfig.register(loggingResource);
+            Collection<ConnectResource> adminResources = adminResources();
+            resources.addAll(adminResources);
+            adminResources.forEach(adminResourceConfig::register);
             adminResourceConfig.register(ConnectExceptionMapper.class);
+            configureAdminResources(adminResourceConfig);
         } else {
             log.info("Skipping adding admin resources");
             // set up adminResource but add no handlers to it
@@ -273,21 +266,21 @@ public class RestServer {
             contextHandlers.add(adminContext);
         }
 
-        String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+        String allowedOrigins = config.allowedOrigins();
         if (!Utils.isBlank(allowedOrigins)) {
             FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
             filterHolder.setName("cross-origin");
             filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
-            String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
+            String allowedMethods = config.allowedMethods();
             if (!Utils.isBlank(allowedMethods)) {
                 filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
             }
             context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
         }
 
-        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+        String headerConfig = config.responseHeaders();
         if (!Utils.isBlank(headerConfig)) {
-            configureHttpResponseHeaderFilter(context);
+            configureHttpResponseHeaderFilter(context, headerConfig);
         }
 
         handlers.setHandlers(contextHandlers.toArray(new Handler[0]));
@@ -309,6 +302,44 @@ public class RestServer {
         log.info("REST resources initialized; server is started and ready to handle requests");
     }
 
+    /**
+     * @return the {@link ConnectResource resources} that should be registered with the
+     * standard (i.e., non-admin) listener for this server; may be empty, but not null
+     */
+    protected abstract Collection<ConnectResource> regularResources();
+
+    /**
+     * @return the {@link ConnectResource resources} that should be registered with the
+     * admin listener for this server; may be empty, but not null
+     */
+    protected abstract Collection<ConnectResource> adminResources();
+
+    /**
+     * Pluggable hook to customize the regular (i.e., non-admin) resources on this server
+     * after they have been instantiated and registered with the given {@link ResourceConfig}.
+     * This may be used to, for example, add REST extensions via {@link #registerRestExtensions(Herder, ResourceConfig)}.
+     * <p>
+     * <em>N.B.: Classes do <b>not</b> need to register the resources provided in {@link #regularResources()} with
+     * the {@link ResourceConfig} parameter in this method; they are automatically registered by the parent class.</em>
+     * @param resourceConfig the {@link ResourceConfig} that the server's regular listeners are registered with; never null
+     */
+    protected void configureRegularResources(ResourceConfig resourceConfig) {
+        // No-op by default
+    }
+
+    /**
+     * Pluggable hook to customize the admin resources on this server after they have been instantiated and registered
+     * with the given {@link ResourceConfig}. This may be used to, for example, add REST extensions via
+     * {@link #registerRestExtensions(Herder, ResourceConfig)}.
+     * <p>
+     * <em>N.B.: Classes do <b>not</b> need to register the resources provided in {@link #adminResources()} with
+     * the {@link ResourceConfig} parameter in this method; they are automatically registered by the parent class.</em>
+     * @param adminResourceConfig the {@link ResourceConfig} that the server's regular listeners are registered with; never null
+     */
+    protected void configureAdminResources(ResourceConfig adminResourceConfig) {
+        // No-op by default
+    }
+
     public URI serverUrl() {
         return jettyServer.getURI();
     }
@@ -346,13 +377,13 @@ public class RestServer {
         ServerConnector serverConnector = findConnector(advertisedSecurityProtocol);
         builder.scheme(advertisedSecurityProtocol);
 
-        String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
+        String advertisedHostname = config.advertisedHostName();
         if (advertisedHostname != null && !advertisedHostname.isEmpty())
             builder.host(advertisedHostname);
         else if (serverConnector != null && serverConnector.getHost() != null && serverConnector.getHost().length() > 0)
             builder.host(serverConnector.getHost());
 
-        Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
+        Integer advertisedPort = config.advertisedPort();
         if (advertisedPort != null)
             builder.port(advertisedPort);
         else if (serverConnector != null && serverConnector.getPort() > 0)
@@ -376,7 +407,7 @@ public class RestServer {
         }
 
         if (adminConnector == null) {
-            List<String> adminListeners = config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG);
+            List<String> adminListeners = config.adminListeners();
             if (adminListeners == null) {
                 return advertisedUrl();
             } else if (adminListeners.isEmpty()) {
@@ -399,9 +430,9 @@ public class RestServer {
     }
 
     String determineAdvertisedProtocol() {
-        String advertisedSecurityProtocol = config.getString(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+        String advertisedSecurityProtocol = config.advertisedListener();
         if (advertisedSecurityProtocol == null) {
-            String listeners = (String) config.originals().get(WorkerConfig.LISTENERS_CONFIG);
+            String listeners = config.rawListeners();
 
             if (listeners == null)
                 return PROTOCOL_HTTP;
@@ -440,14 +471,14 @@ public class RestServer {
         return null;
     }
 
-    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+    protected final void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
         connectRestExtensions = herder.plugins().newPlugins(
-            config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+            config.restExtensions(),
             config, ConnectRestExtension.class);
 
         long herderRequestTimeoutMs = ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
-        Integer rebalanceTimeoutMs = config.getRebalanceTimeout();
+        Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs();
 
         if (rebalanceTimeoutMs != null) {
             herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
@@ -472,8 +503,7 @@ public class RestServer {
      * Register header filter to ServletContextHandler.
      * @param context The servlet context handler
      */
-    protected void configureHttpResponseHeaderFilter(ServletContextHandler context) {
-        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+    protected void configureHttpResponseHeaderFilter(ServletContextHandler context, String headerConfig) {
         FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class);
         headerFilterHolder.setInitParameter("headerConfig", headerConfig);
         context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
new file mode 100644
index 00000000000..0d6d06a4a59
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.eclipse.jetty.util.StringUtil;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * Defines the configuration surface for a {@link RestServer} instance, with support for both
+ * {@link #forInternal(Map) internal-only} and {@link #forPublic(Integer, Map) user-facing}
+ * servers. An internal-only server will expose only the endpoints and listeners necessary for
+ * intra-cluster communication; these include the task-write and zombie-fencing endpoints. A
+ * user-facing server will expose these endpoints and, in addition, all endpoints that are part of
+ * the public REST API for Kafka Connect; these include the connector creation, connector
+ * status, configuration validation, and logging endpoints. In addition, a user-facing server will
+ * instantiate any user-configured
+ * {@link RestServerConfig#REST_EXTENSION_CLASSES_CONFIG REST extensions}.
+ */
+public abstract class RestServerConfig extends AbstractConfig {
+
+    public static final String LISTENERS_CONFIG = "listeners";
+    private static final String LISTENERS_DOC
+            = "List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.\n" +
+            " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
+            " Leave hostname empty to bind to default interface.\n" +
+            " Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084";
+    // Visible for testing
+    static final List<String> LISTENERS_DEFAULT = Collections.singletonList("http://:8083");
+
+    public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
+    private static final String REST_ADVERTISED_HOST_NAME_DOC
+            = "If this is set, this is the hostname that will be given out to other workers to connect to.";
+
+    public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
+    private static final String REST_ADVERTISED_PORT_DOC
+            = "If this is set, this is the port that will be given out to other workers to connect to.";
+
+    public static final String REST_ADVERTISED_LISTENER_CONFIG = "rest.advertised.listener";
+    private static final String REST_ADVERTISED_LISTENER_DOC
+            = "Sets the advertised listener (HTTP or HTTPS) which will be given to other workers to use.";
+
+    public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
+    private static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
+            "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
+                    "To enable cross origin access, set this to the domain of the application that should be permitted" +
+                    " to access the API, or '*' to allow access from any domain. The default value only allows access" +
+                    " from the domain of the REST API.";
+    protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
+
+    public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods";
+    private static final String ACCESS_CONTROL_ALLOW_METHODS_DOC =
+            "Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. "
+                    + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
+    private static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";
+
+    public static final String ADMIN_LISTENERS_CONFIG = "admin.listeners";
+    private static final String ADMIN_LISTENERS_DOC = "List of comma-separated URIs the Admin REST API will listen on." +
+            " The supported protocols are HTTP and HTTPS." +
+            " An empty or blank string will disable this feature." +
+            " The default behavior is to use the regular listener (specified by the 'listeners' property).";
+    public static final String ADMIN_LISTENERS_HTTPS_CONFIGS_PREFIX = "admin.listeners.https.";
+
+    public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
+    private static final String REST_EXTENSION_CLASSES_DOC =
+            "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+                    + "in the order specified. Implementing the interface  "
+                    + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources like filters. "
+                    + "Typically used to add custom capability like logging, security, etc. ";
+
+    // Visible for testing
+    static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
+    // Visible for testing
+    static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
+    // Visible for testing
+    static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
+    private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
+            Arrays.asList("set", "add", "setDate", "addDate")
+    );
+
+
+    /**
+     * @return the listeners to use for this server, or empty if no admin endpoints should be exposed,
+     * or null if the admin endpoints should be exposed on the {@link #listeners() regular listeners} for
+     * this server
+     */
+    public abstract List<String> adminListeners();
+
+    /**
+     * @return a list of {@link #REST_EXTENSION_CLASSES_CONFIG REST extension} classes
+     * to instantiate and use with the server
+     */
+    public abstract List<String> restExtensions();
+
+    /**
+     * @return whether {@link WorkerConfig#TOPIC_TRACKING_ENABLE_CONFIG topic tracking}
+     * is enabled on this worker
+     */
+    public abstract boolean topicTrackingEnabled();
+
+    /**
+     * @return whether {@link WorkerConfig#TOPIC_TRACKING_ALLOW_RESET_CONFIG topic tracking resets}
+     * are enabled on this worker
+     */
+    public abstract boolean topicTrackingResetEnabled();
+
+    /**
+     * Add the properties related to a user-facing server to the given {@link ConfigDef}.
+     * </p>
+     * This automatically adds the properties for intra-cluster communication; it is not necessary to
+     * invoke both {@link #addInternalConfig(ConfigDef)} and this method on the same {@link ConfigDef}.
+     * @param configDef the {@link ConfigDef} to add the properties to; may not be null
+     */
+    public static void addPublicConfig(ConfigDef configDef) {
+        addInternalConfig(configDef);
+        configDef
+                .define(
+                        REST_EXTENSION_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC
+                ).define(ADMIN_LISTENERS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        null,
+                        new AdminListenersValidator(),
+                        ConfigDef.Importance.LOW,
+                        ADMIN_LISTENERS_DOC);
+    }
+
+    /**
+     * Add the properties related to an internal-only server to the given {@link ConfigDef}.
+     * @param configDef the {@link ConfigDef} to add the properties to; may not be null
+     */
+    public static void addInternalConfig(ConfigDef configDef) {
+        configDef
+                .define(
+                        LISTENERS_CONFIG,
+                        ConfigDef.Type.LIST,
+                        LISTENERS_DEFAULT,
+                        new ListenersValidator(),
+                        ConfigDef.Importance.LOW,
+                        LISTENERS_DOC
+                ).define(
+                        REST_ADVERTISED_HOST_NAME_CONFIG,
+                        ConfigDef.Type.STRING,
+                        null,
+                        ConfigDef.Importance.LOW,
+                        REST_ADVERTISED_HOST_NAME_DOC
+                ).define(
+                        REST_ADVERTISED_PORT_CONFIG,
+                        ConfigDef.Type.INT,
+                        null,
+                        ConfigDef.Importance.LOW,
+                        REST_ADVERTISED_PORT_DOC
+                ).define(
+                        REST_ADVERTISED_LISTENER_CONFIG,
+                        ConfigDef.Type.STRING,
+                        null,
+                        ConfigDef.Importance.LOW,
+                        REST_ADVERTISED_LISTENER_DOC
+                ).define(
+                        ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT,
+                        ConfigDef.Importance.LOW,
+                        ACCESS_CONTROL_ALLOW_ORIGIN_DOC
+                ).define(
+                        ACCESS_CONTROL_ALLOW_METHODS_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ACCESS_CONTROL_ALLOW_METHODS_DEFAULT,
+                        ConfigDef.Importance.LOW,
+                        ACCESS_CONTROL_ALLOW_METHODS_DOC
+                ).define(
+                        RESPONSE_HTTP_HEADERS_CONFIG,
+                        ConfigDef.Type.STRING,
+                        RESPONSE_HTTP_HEADERS_DEFAULT,
+                        new ResponseHttpHeadersValidator(),
+                        ConfigDef.Importance.LOW,
+                        RESPONSE_HTTP_HEADERS_DOC
+                ).define(
+                        BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
+                        ConfigDef.Type.STRING,
+                        SslClientAuth.NONE.toString(),
+                        in(Utils.enumOptions(SslClientAuth.class)),
+                        ConfigDef.Importance.LOW,
+                        BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
+    }
+
+    public static RestServerConfig forPublic(Integer rebalanceTimeoutMs, Map<?, ?> props) {
+        return new PublicConfig(rebalanceTimeoutMs, props);
+    }
+
+    public static RestServerConfig forInternal(Map<?, ?> props) {
+        return new InternalConfig(props);
+    }
+
+    public List<String> listeners() {
+        return getList(LISTENERS_CONFIG);
+    }
+
+    public String rawListeners() {
+        return (String) originals().get(LISTENERS_CONFIG);
+    }
+
+    public String allowedOrigins() {
+        return getString(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+    }
+
+    public String allowedMethods() {
+        return getString(ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
+    }
+
+    public String responseHeaders() {
+        return getString(RESPONSE_HTTP_HEADERS_CONFIG);
+    }
+
+    public String advertisedListener() {
+        return getString(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG);
+    }
+
+    public String advertisedHostName() {
+        return getString(REST_ADVERTISED_HOST_NAME_CONFIG);
+    }
+
+    public Integer advertisedPort() {
+        return getInt(REST_ADVERTISED_PORT_CONFIG);
+    }
+
+    public Integer rebalanceTimeoutMs() {
+        return null;
+    }
+
+    protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) {
+        super(configDef, props);
+    }
+
+    // Visible for testing
+    static void validateHttpResponseHeaderConfig(String config) {
+        try {
+            // validate format
+            String[] configTokens = config.trim().split("\\s+", 2);
+            if (configTokens.length != 2) {
+                throw new ConfigException(String.format("Invalid format of header config '%s'. "
+                        + "Expected: '[action] [header name]:[header value]'", config));
+            }
+
+            // validate action
+            String method = configTokens[0].trim();
+            validateHeaderConfigAction(method);
+
+            // validate header name and header value pair
+            String header = configTokens[1];
+            String[] headerTokens = header.trim().split(":");
+            if (headerTokens.length != 2) {
+                throw new ConfigException(
+                        String.format("Invalid format of header name and header value pair '%s'. "
+                                + "Expected: '[header name]:[header value]'", header));
+            }
+
+            // validate header name
+            String headerName = headerTokens[0].trim();
+            if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+                throw new ConfigException(String.format("Invalid header name '%s'. "
+                        + "The '[header name]' cannot contain whitespace", headerName));
+            }
+        } catch (ArrayIndexOutOfBoundsException e) {
+            throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
+        }
+    }
+
+    // Visible for testing
+    static void validateHeaderConfigAction(String action) {
+        if (HEADER_ACTIONS.stream().noneMatch(action::equalsIgnoreCase)) {
+            throw new ConfigException(String.format("Invalid header config action: '%s'. "
+                    + "Expected one of %s", action, HEADER_ACTIONS));
+        }
+    }
+
+    private static class ListenersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (!(value instanceof List)) {
+                throw new ConfigException("Invalid value type for listeners (expected list of URLs , ex: http://localhost:8080,https://localhost:8443).");
+            }
+
+            List<?> items = (List<?>) value;
+            if (items.isEmpty()) {
+                throw new ConfigException("Invalid value for listeners, at least one URL is expected, ex: http://localhost:8080,https://localhost:8443.");
+            }
+
+            for (Object item : items) {
+                if (!(item instanceof String)) {
+                    throw new ConfigException("Invalid type for listeners (expected String).");
+                }
+                if (Utils.isBlank((String) item)) {
+                    throw new ConfigException("Empty URL found when parsing listeners list.");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
+        }
+    }
+
+    private static class AdminListenersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            if (value == null) {
+                return;
+            }
+
+            if (!(value instanceof List)) {
+                throw new ConfigException("Invalid value type for admin.listeners (expected list).");
+            }
+
+            List<?> items = (List<?>) value;
+            if (items.isEmpty()) {
+                return;
+            }
+
+            for (Object item : items) {
+                if (!(item instanceof String)) {
+                    throw new ConfigException("Invalid type for admin.listeners (expected String).");
+                }
+                if (Utils.isBlank((String) item)) {
+                    throw new ConfigException("Empty URL found when parsing admin.listeners list.");
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "List of comma-separated URLs, ex: http://localhost:8080,https://localhost:8443.";
+        }
+    }
+
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (Utils.isBlank(strValue)) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
+            Arrays.stream(configs).forEach(RestServerConfig::validateHttpResponseHeaderConfig);
+        }
+
+        @Override
+        public String toString() {
+            return "Comma-separated header rules, where each header rule is of the form "
+                    + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                    + "if any part of a header rule contains a comma";
+        }
+    }
+
+    private static class InternalConfig extends RestServerConfig {
+
+        private static ConfigDef config() {
+            ConfigDef result = new ConfigDef().withClientSslSupport();
+            addInternalConfig(result);
+            return result;
+        }
+
+        @Override
+        public List<String> adminListeners() {
+            // Disable admin resources (such as the logging resource)
+            return Collections.emptyList();
+        }
+
+        @Override
+        public List<String> restExtensions() {
+            // Disable the use of REST extensions
+            return null;
+        }
+
+        @Override
+        public boolean topicTrackingEnabled() {
+            // Topic tracking is unnecessary if we don't expose a public REST API
+            return false;
+        }
+
+        @Override
+        public boolean topicTrackingResetEnabled() {
+            // Topic tracking is unnecessary if we don't expose a public REST API
+            return false;
+        }
+
+        public InternalConfig(Map<?, ?> props) {
+            super(config(), props);
+        }
+    }
+
+    private static class PublicConfig extends RestServerConfig {
+
+        private final Integer rebalanceTimeoutMs;
+        private static ConfigDef config() {
+            ConfigDef result = new ConfigDef().withClientSslSupport();
+            addPublicConfig(result);
+            WorkerConfig.addTopicTrackingConfig(result);
+            return result;
+        }
+
+        @Override
+        public List<String> adminListeners() {
+            return getList(ADMIN_LISTENERS_CONFIG);
+        }
+
+        @Override
+        public List<String> restExtensions() {
+            return getList(REST_EXTENSION_CLASSES_CONFIG);
+        }
+
+        @Override
+        public Integer rebalanceTimeoutMs() {
+            return rebalanceTimeoutMs;
+        }
+
+        @Override
+        public boolean topicTrackingEnabled() {
+            return getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG);
+        }
+
+        @Override
+        public boolean topicTrackingResetEnabled() {
+            return getBoolean(WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG);
+        }
+
+        public PublicConfig(Integer rebalanceTimeoutMs, Map<?, ?> props) {
+            super(config(), props);
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index f81e5daf6fd..5e5d97a3696 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -21,19 +21,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.core.HttpHeaders;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.Crypto;
-import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
-import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
-import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -67,43 +63,33 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Translator;
+import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.IdentityTranslator;
 
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
 public class ConnectorsResource implements ConnectResource {
     private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class);
-    private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
-        new TypeReference<List<Map<String, String>>>() { };
 
     private final Herder herder;
-    private final RestClient restClient;
-    private long requestTimeoutMs;
+    private final HerderRequestHandler requestHandler;
     @javax.ws.rs.core.Context
     private ServletContext context;
     private final boolean isTopicTrackingDisabled;
     private final boolean isTopicTrackingResetDisabled;
 
-    public ConnectorsResource(Herder herder, WorkerConfig config, RestClient restClient) {
+    public ConnectorsResource(Herder herder, RestServerConfig config, RestClient restClient) {
         this.herder = herder;
-        this.restClient = restClient;
-        isTopicTrackingDisabled = !config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
-        isTopicTrackingResetDisabled = !config.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG);
-        this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
+        this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS);
+        this.isTopicTrackingDisabled = !config.topicTrackingEnabled();
+        this.isTopicTrackingResetDisabled = !config.topicTrackingResetEnabled();
     }
 
     @Override
     public void requestTimeout(long requestTimeoutMs) {
-        if (requestTimeoutMs < 1) {
-            throw new IllegalArgumentException("REST request timeout must be positive");
-        }
-        this.requestTimeoutMs = requestTimeoutMs;
+        requestHandler.requestTimeoutMs(requestTimeoutMs);
     }
 
     @GET
@@ -160,7 +146,7 @@ public class ConnectorsResource implements ConnectResource {
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
-        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
+        Herder.Created<ConnectorInfo> info = requestHandler.completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
 
         URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -175,7 +161,7 @@ public class ConnectorsResource implements ConnectResource {
                                       final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
+        return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
     }
 
     @GET
@@ -186,7 +172,7 @@ public class ConnectorsResource implements ConnectResource {
                                                   final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
+        return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
     }
 
     @GET
@@ -198,7 +184,7 @@ public class ConnectorsResource implements ConnectResource {
             final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
         herder.tasksConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
+        return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
     }
 
     @GET
@@ -247,7 +233,7 @@ public class ConnectorsResource implements ConnectResource {
         checkAndPutConnectorConfigName(connector, connectorConfig);
 
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
-        Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
+        Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
                 "PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
         if (createdInfo.created()) {
@@ -273,7 +259,7 @@ public class ConnectorsResource implements ConnectResource {
             // For backward compatibility, just restart the connector instance and return OK with no body
             FutureCallback<Void> cb = new FutureCallback<>();
             herder.restartConnector(connector, cb);
-            completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
+            requestHandler.completeOrForwardRequest(cb, forwardingPath, "POST", headers, null, forward);
             return Response.noContent().build();
         }
 
@@ -283,7 +269,7 @@ public class ConnectorsResource implements ConnectResource {
         Map<String, String> queryParameters = new HashMap<>();
         queryParameters.put("includeTasks", includeTasks.toString());
         queryParameters.put("onlyFailed", onlyFailed.toString());
-        ConnectorStateInfo stateInfo = completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>() {
+        ConnectorStateInfo stateInfo = requestHandler.completeOrForwardRequest(cb, forwardingPath, "POST", headers, queryParameters, null, new TypeReference<ConnectorStateInfo>() {
         }, new IdentityTranslator<>(), forward);
         return Response.accepted().entity(stateInfo).build();
     }
@@ -314,33 +300,7 @@ public class ConnectorsResource implements ConnectResource {
                                          final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
         herder.taskConfigs(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
-        }, forward);
-    }
-
-    @POST
-    @Path("/{connector}/tasks")
-    @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
-    public void putTaskConfigs(final @PathParam("connector") String connector,
-                               final @Context HttpHeaders headers,
-                               final @QueryParam("forward") Boolean forward,
-                               final byte[] requestBody) throws Throwable {
-        List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
-    }
-
-    @PUT
-    @Path("/{connector}/fence")
-    @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
-    public void fenceZombies(final @PathParam("connector") String connector,
-                             final @Context HttpHeaders headers,
-                             final @QueryParam("forward") Boolean forward,
-                             final byte[] requestBody) throws Throwable {
-        FutureCallback<Void> cb = new FutureCallback<>();
-        herder.fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward);
+        return requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() { }, forward);
     }
 
     @GET
@@ -362,7 +322,7 @@ public class ConnectorsResource implements ConnectResource {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
         herder.restartTask(taskId, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
+        requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
     }
 
     @DELETE
@@ -373,7 +333,7 @@ public class ConnectorsResource implements ConnectResource {
                                  final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.deleteConnectorConfig(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
+        requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
     }
 
     // Check whether the connector name from the url matches the one (if there is one) provided in the connectorConfig
@@ -388,95 +348,6 @@ public class ConnectorsResource implements ConnectResource {
         }
     }
 
-    // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
-    // request to the leader.
-    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
-                                              String path,
-                                              String method,
-                                              HttpHeaders headers,
-                                              Map<String, String> queryParameters,
-                                              Object body,
-                                              TypeReference<U> resultType,
-                                              Translator<T, U> translator,
-                                              Boolean forward) throws Throwable {
-        try {
-            return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-
-            if (cause instanceof RequestTargetException) {
-                if (restClient == null) {
-                    throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
-                            "Cannot complete request as non-leader with request forwarding disabled");
-                } else if (forward == null || forward) {
-                    // the only time we allow recursive forwarding is when no forward flag has
-                    // been set, which should only be seen by the first worker to handle a user request.
-                    // this gives two total hops to resolve the request before giving up.
-                    boolean recursiveForward = forward == null;
-                    RequestTargetException targetException = (RequestTargetException) cause;
-                    String forwardedUrl = targetException.forwardUrl();
-                    if (forwardedUrl == null) {
-                        // the target didn't know of the leader at this moment.
-                        throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
-                                "Cannot complete request momentarily due to no known leader URL, "
-                                + "likely because a rebalance was underway.");
-                    }
-                    UriBuilder uriBuilder = UriBuilder.fromUri(forwardedUrl)
-                            .path(path)
-                            .queryParam("forward", recursiveForward);
-                    if (queryParameters != null) {
-                        queryParameters.forEach(uriBuilder::queryParam);
-                    }
-                    String forwardUrl = uriBuilder.build().toString();
-                    log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
-                    return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType));
-                } else {
-                    // we should find the right target for the query within two hops, so if
-                    // we don't, it probably means that a rebalance has taken place.
-                    throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
-                            "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
-                }
-            } else if (cause instanceof RebalanceNeededException) {
-                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(),
-                        "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
-            }
-
-            throw cause;
-        } catch (TimeoutException e) {
-            // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server
-            // error is the best option
-            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
-        } catch (InterruptedException e) {
-            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
-        }
-    }
-
-    private <T, U> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
-                                              TypeReference<U> resultType, Translator<T, U> translator, Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, headers, null, body, resultType, translator, forward);
-    }
-
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
-                                           TypeReference<T> resultType, Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<>(), forward);
-    }
-
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
-                                           Object body, Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<>(), forward);
-    }
-
-    private interface Translator<T, U> {
-        T translate(RestClient.HttpResponse<U> response);
-    }
-
-    private static class IdentityTranslator<T> implements Translator<T, T> {
-        @Override
-        public T translate(RestClient.HttpResponse<T> response) {
-            return response.body();
-        }
-    }
-
     private static class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
         @Override
         public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> response) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
new file mode 100644
index 00000000000..c7bef991b41
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.v3.oas.annotations.Operation;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.Crypto;
+import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
+import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Contains endpoints necessary for intra-cluster communication--that is, requests that
+ * workers will issue to each other that originate from within the cluster, as opposed to
+ * requests that originate from a user and are forwarded from one worker to another.
+ */
+@Produces(MediaType.APPLICATION_JSON)
+public abstract class InternalClusterResource implements ConnectResource {
+
+    private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE =
+            new TypeReference<List<Map<String, String>>>() { };
+
+    private final HerderRequestHandler requestHandler;
+
+    // Visible for testing
+    @Context
+    UriInfo uriInfo;
+
+    protected InternalClusterResource(RestClient restClient) {
+        this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS);
+    }
+
+    @Override
+    public void requestTimeout(long requestTimeoutMs) {
+        requestHandler.requestTimeoutMs(requestTimeoutMs);
+    }
+
+    /**
+     * @return a {@link Herder} instance that can be used to satisfy the current request; may not be null
+     * @throws javax.ws.rs.NotFoundException if no such herder can be provided
+     */
+    protected abstract Herder herderForRequest();
+
+    @POST
+    @Path("/{connector}/tasks")
+    @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
+    public void putTaskConfigs(
+            final @PathParam("connector") String connector,
+            final @Context HttpHeaders headers,
+            final @QueryParam("forward") Boolean forward,
+            final byte[] requestBody) throws Throwable {
+        List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herderForRequest().putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
+        requestHandler.completeOrForwardRequest(
+                cb,
+                uriInfo.getPath(),
+                "POST",
+                headers,
+                taskConfigs,
+                forward
+        );
+    }
+
+    @PUT
+    @Path("/{connector}/fence")
+    @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
+    public void fenceZombies(
+            final @PathParam("connector") String connector,
+            final @Context HttpHeaders headers,
+            final @QueryParam("forward") Boolean forward,
+            final byte[] requestBody) throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herderForRequest().fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
+        requestHandler.completeOrForwardRequest(
+                cb,
+                uriInfo.getPath(),
+                "PUT",
+                headers,
+                requestBody,
+                forward
+        );
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
new file mode 100644
index 00000000000..c4987150dfb
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+
+import javax.ws.rs.Path;
+
+@Path("/connectors")
+public class InternalConnectResource extends InternalClusterResource {
+
+    private final Herder herder;
+
+    public InternalConnectResource(Herder herder, RestClient restClient) {
+        super(restClient);
+        this.herder = herder;
+    }
+
+    @Override
+    protected Herder herderForRequest() {
+        return herder;
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
index c913ffe747b..efb184849f8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.connect.runtime.rest.util;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.RestClient;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -40,7 +40,7 @@ public class SSLUtils {
     /**
      * Configures SSL/TLS for HTTPS Jetty Server using configs with the given prefix
      */
-    public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config, String prefix) {
+    public static SslContextFactory createServerSideSslContextFactory(AbstractConfig config, String prefix) {
         Map<String, Object> sslConfigValues = config.valuesWithPrefixAllOrNothing(prefix);
 
         final SslContextFactory.Server ssl = new SslContextFactory.Server();
@@ -56,14 +56,14 @@ public class SSLUtils {
     /**
      * Configures SSL/TLS for HTTPS Jetty Server
      */
-    public static SslContextFactory createServerSideSslContextFactory(WorkerConfig config) {
+    public static SslContextFactory createServerSideSslContextFactory(AbstractConfig config) {
         return createServerSideSslContextFactory(config, "listeners.https.");
     }
 
     /**
      * Configures SSL/TLS for HTTPS Jetty Client
      */
-    public static SslContextFactory createClientSideSslContextFactory(WorkerConfig config) {
+    public static SslContextFactory createClientSideSslContextFactory(AbstractConfig config) {
         Map<String, Object> sslConfigValues = config.valuesWithPrefixAllOrNothing("listeners.https.");
 
         final SslContextFactory.Client ssl = new SslContextFactory.Client();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index 6ec86bd9342..a0f993f2f63 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -44,7 +44,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
 import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.REST_EXTENSION_CLASSES_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
index 8046f411010..7c9e2f2c51e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestForwardingIntegrationTest.java
@@ -34,8 +34,9 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.ConnectRestServer;
 import org.apache.kafka.connect.runtime.rest.RestClient;
-import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@@ -80,10 +81,10 @@ public class RestForwardingIntegrationTest {
     private Map<String, Object> sslConfig;
     @Mock
     private Plugins plugins;
-    private RestServer followerServer;
+    private ConnectRestServer followerServer;
     @Mock
     private Herder followerHerder;
-    private RestServer leaderServer;
+    private ConnectRestServer leaderServer;
     @Mock
     private Herder leaderHerder;
 
@@ -158,14 +159,14 @@ public class RestForwardingIntegrationTest {
 
         // Follower worker setup
         RestClient followerClient = new RestClient(followerConfig);
-        followerServer = new RestServer(followerConfig, followerClient);
+        followerServer = new ConnectRestServer(null, followerClient, followerConfig.originals());
         followerServer.initializeServer();
         when(followerHerder.plugins()).thenReturn(plugins);
         followerServer.initializeResources(followerHerder);
 
         // Leader worker setup
         RestClient leaderClient = new RestClient(leaderConfig);
-        leaderServer = new RestServer(leaderConfig, leaderClient);
+        leaderServer = new ConnectRestServer(null, leaderClient, leaderConfig.originals());
         leaderServer.initializeServer();
         when(leaderHerder.plugins()).thenReturn(plugins);
         leaderServer.initializeResources(leaderHerder);
@@ -235,13 +236,13 @@ public class RestForwardingIntegrationTest {
             }
         }
         if (dualListener) {
-            workerProps.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
+            workerProps.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:0, https://localhost:0");
             // This server is brought up with both a plaintext and an SSL listener; we use this property
             // to dictate which URL it advertises to other servers when a request must be forwarded to it
             // and which URL we issue requests against during testing
-            workerProps.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
+            workerProps.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, advertiseSSL ? "https" : "http");
         } else {
-            workerProps.put(WorkerConfig.LISTENERS_CONFIG, advertiseSSL ? "https://localhost:0" : "http://localhost:0");
+            workerProps.put(RestServerConfig.LISTENERS_CONFIG, advertiseSSL ? "https://localhost:0" : "http://localhost:0");
         }
 
         return workerProps;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index ec9a843d4da..dbb25cdafac 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.After;
 import org.junit.Before;
@@ -33,40 +31,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
 
-import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_DEFAULT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 
 public class WorkerConfigTest {
-    private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
-            "add \t Cache-Control: no-cache, no-store, must-revalidate",
-            "add \r X-XSS-Protection: 1; mode=block",
-            "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
-            "AdD   Strict-Transport-Security:  \r  max-age=31536000;  includeSubDomains",
-            "AdD \t Strict-Transport-Security : \n   max-age=31536000;  includeSubDomains",
-            "add X-Content-Type-Options: \r nosniff",
-            "Set \t X-Frame-Options: \t Deny\n ",
-            "seT \t X-Cache-Info: \t not cacheable\n ",
-            "seTDate \t Expires: \r 31540000000",
-            "adDdate \n Last-Modified: \t 0"
-    );
-
-    private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
-            "set \t",
-            "badaction \t X-Frame-Options:DENY",
-            "set add X-XSS-Protection:1",
-            "addX-XSS-Protection",
-            "X-XSS-Protection:",
-            "add set X-XSS-Protection: 1",
-            "add X-XSS-Protection:1 X-XSS-Protection:1 ",
-            "add X-XSS-Protection",
-            "set X-Frame-Options:DENY, add  :no-cache, no-store, must-revalidate "
-    );
 
     private static final String CLUSTER_ID = "cluster-id";
     private MockedStatic<WorkerConfig> workerConfigMockedStatic;
@@ -82,105 +54,6 @@ public class WorkerConfigTest {
         workerConfigMockedStatic.close();
     }
 
-    @Test
-    public void testListenersConfigAllowedValues() {
-        Map<String, String> props = baseProps();
-
-        // no value set for "listeners"
-        WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertEquals(LISTENERS_DEFAULT, config.getList(WorkerConfig.LISTENERS_CONFIG));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999");
-        config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertEquals(Arrays.asList("http://a.b:9999"), config.getList(WorkerConfig.LISTENERS_CONFIG));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
-        config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.LISTENERS_CONFIG));
-
-        new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-    }
-
-    @Test
-    public void testListenersConfigNotAllowedValues() {
-        Map<String, String> props = baseProps();
-        assertEquals(LISTENERS_DEFAULT, new WorkerConfig(WorkerConfig.baseConfigDef(), props).getList(WorkerConfig.LISTENERS_CONFIG));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, "");
-        ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(" listeners"));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, ",,,");
-        ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(" listeners"));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999,");
-        ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(" listeners"));
-
-        props.put(WorkerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
-        ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(" listeners"));
-    }
-
-    @Test
-    public void testAdminListenersConfigAllowedValues() {
-        Map<String, String> props = baseProps();
-
-        // no value set for "admin.listeners"
-        WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertNull("Default value should be null.", config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG));
-
-        props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "");
-        config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertTrue(config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG).isEmpty());
-
-        props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
-        config = new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-        assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.getList(WorkerConfig.ADMIN_LISTENERS_CONFIG));
-
-        new WorkerConfig(WorkerConfig.baseConfigDef(), props);
-    }
-
-    @Test
-    public void testAdminListenersNotAllowingEmptyStrings() {
-        Map<String, String> props = baseProps();
-
-        props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,");
-        ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(" admin.listeners"));
-    }
-
-    @Test
-    public void testAdminListenersNotAllowingBlankStrings() {
-        Map<String, String> props = baseProps();
-        props.put(WorkerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
-        assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-    }
-
-    @Test
-    public void testInvalidHeaderConfigs() {
-        for (String config : INVALID_HEADER_CONFIGS) {
-            assertInvalidHeaderConfig(config);
-        }
-    }
-
-    @Test
-    public void testValidHeaderConfigs() {
-        for (String config : VALID_HEADER_CONFIGS) {
-            assertValidHeaderConfig(config);
-        }
-    }
-
-    @Test
-    public void testInvalidSslClientAuthConfig() {
-        Map<String, String> props = baseProps();
-
-        props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "abc");
-        ConfigException ce = assertThrows(ConfigException.class, () -> new WorkerConfig(WorkerConfig.baseConfigDef(), props));
-        assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
-    }
-
     @Test
     public void testLookupKafkaClusterId() {
         final Node broker1 = new Node(0, "dummyHost-1", 1234);
@@ -225,14 +98,6 @@ public class WorkerConfigTest {
         workerConfigMockedStatic.verify(() -> WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
     }
 
-    private void assertInvalidHeaderConfig(String config) {
-        assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
-    }
-
-    private void assertValidHeaderConfig(String config) {
-        WorkerConfig.validateHttpResponseHeaderConfig(config);
-    }
-
     private Map<String, String> baseProps() {
         Map<String, String> props = new HashMap<>();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index e387847e6be..99dffe17d9d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -244,7 +244,7 @@ public class DistributedHerderTest {
                 new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig", "buildRestartPlan", "recordRestarting"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
-                new AutoCloseable[]{uponShutdown});
+                Collections.emptyList(), new AutoCloseable[]{uponShutdown});
 
         configUpdateListener = herder.new ConfigUpdateListener();
         rebalanceListener = herder.new RebalanceListener(time);
@@ -4020,7 +4020,7 @@ public class DistributedHerderTest {
                 new String[]{"connectorType", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
                 new DistributedConfig(config), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, restClient, metrics, time, noneConnectorClientConfigOverridePolicy,
-                new AutoCloseable[0]);
+                Collections.emptyList(), new AutoCloseable[0]);
     }
 
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 4b040de92c4..ee628e113e6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
@@ -140,12 +141,13 @@ public class PluginsTest {
 
     @Test
     public void shouldInstantiateAndConfigureConnectRestExtension() {
-        props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
+        props.clear();
+        props.put(RestServerConfig.REST_EXTENSION_CLASSES_CONFIG,
                   TestConnectRestExtension.class.getName());
-        createConfig();
+        config = RestServerConfig.forPublic(null, props);
 
         List<ConnectRestExtension> connectRestExtensions =
-            plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+            plugins.newPlugins(config.getList(RestServerConfig.REST_EXTENSION_CLASSES_CONFIG),
                                config,
                                ConnectRestExtension.class);
         assertNotNull(connectRestExtensions);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
similarity index 70%
rename from connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
rename to connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index e0c5b22c90e..ad5c88f9aeb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -29,14 +29,11 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicResponseHandler;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -55,20 +52,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-public class RestServerTest {
+public class ConnectRestServerTest {
 
     private Herder herder;
     private Plugins plugins;
-    private RestServer server;
+    private ConnectRestServer server;
     private CloseableHttpClient httpClient;
     private Collection<CloseableHttpResponse> responses = new ArrayList<>();
 
@@ -94,18 +92,10 @@ public class RestServerTest {
         }
     }
 
-    private Map<String, String> baseWorkerProps() {
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
-        workerProps.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
-        workerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        workerProps.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
-        workerProps.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
-        workerProps.put(WorkerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");
-
-        return workerProps;
+    private Map<String, String> baseServerProps() {
+        Map<String, String> configMap = new HashMap<>();
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "HTTP://localhost:0");
+        return configMap;
     }
 
     @Test
@@ -121,65 +111,60 @@ public class RestServerTest {
     @Test
     public void testAdvertisedUri() {
         // Advertised URI from listeners without protocol
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
-        DistributedConfig config = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
 
-        server = new RestServer(config, null);
+        server = new ConnectRestServer(null, null, configMap);
         Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString());
         server.stop();
 
         // Advertised URI from listeners with protocol
-        configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(WorkerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
-        configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
-        config = new DistributedConfig(configMap);
+        configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443");
+        configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https");
 
-        server = new RestServer(config, null);
+        server = new ConnectRestServer(null, null, configMap);
         Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
         server.stop();
 
         // Advertised URI from listeners with only SSL available
-        configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
-        config = new DistributedConfig(configMap);
+        configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
 
-        server = new RestServer(config, null);
+        server = new ConnectRestServer(null, null, configMap);
         Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString());
         server.stop();
 
         // Listener is overriden by advertised values
-        configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://localhost:8443");
-        configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
-        configMap.put(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
-        configMap.put(WorkerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
-        config = new DistributedConfig(configMap);
-
-        server = new RestServer(config, null);
+        configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443");
+        configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+        configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost");
+        configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
+
+        server = new ConnectRestServer(null, null, configMap);
         Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString());
         server.stop();
 
         // correct listener is chosen when https listener is configured before http listener and advertised listener is http
-        configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(WorkerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
-        configMap.put(WorkerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
-        config = new DistributedConfig(configMap);
-        server = new RestServer(config, null);
+        configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761");
+        configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http");
+
+        server = new ConnectRestServer(null, null, configMap);
         Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString());
         server.stop();
     }
 
     @Test
     public void testOptionsDoesNotIncludeWadlOutput() throws IOException {
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        DistributedConfig workerConfig = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -197,17 +182,16 @@ public class RestServerTest {
 
     public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method)
         throws IOException {
-        Map<String, String> workerProps = baseWorkerProps();
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
-        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+        Map<String, String> configMap = baseServerProps();
+        configMap.put(RestServerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+        configMap.put(RestServerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         URI serverUrl = server.advertisedUrl();
@@ -242,16 +226,15 @@ public class RestServerTest {
 
     @Test
     public void testStandaloneConfig() throws IOException  {
-        Map<String, String> workerProps = baseWorkerProps();
-        workerProps.put("offset.storage.file.filename", "/tmp");
-        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+        Map<String, String> configMap = baseServerProps();
+        configMap.put("offset.storage.file.filename", "/tmp");
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
@@ -262,17 +245,16 @@ public class RestServerTest {
 
     @Test
     public void testLoggersEndpointWithDefaults() throws IOException {
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        DistributedConfig workerConfig = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
 
         // create some loggers in the process
         LoggerFactory.getLogger("a.b.c.s.W");
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -292,14 +274,12 @@ public class RestServerTest {
 
     @Test
     public void testIndependentAdminEndpoint() throws IOException {
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(ADMIN_LISTENERS_CONFIG, "http://localhost:0");
-
-        DistributedConfig workerConfig = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://localhost:0");
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
 
         // create some loggers in the process
         LoggerFactory.getLogger("a.b.c.s.W");
@@ -307,7 +287,7 @@ public class RestServerTest {
         LoggerFactory.getLogger("a.b.c.p.Y");
         LoggerFactory.getLogger("a.b.c.p.Z");
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -322,16 +302,14 @@ public class RestServerTest {
 
     @Test
     public void testDisableAdminEndpoint() throws IOException {
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        configMap.put(ADMIN_LISTENERS_CONFIG, "");
-
-        DistributedConfig workerConfig = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
+        configMap.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "");
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -344,14 +322,13 @@ public class RestServerTest {
 
     @Test
     public void testRequestLogs() throws IOException, InterruptedException {
-        Map<String, String> configMap = new HashMap<>(baseWorkerProps());
-        DistributedConfig workerConfig = new DistributedConfig(configMap);
+        Map<String, String> configMap = new HashMap<>(baseServerProps());
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
 
@@ -388,17 +365,16 @@ public class RestServerTest {
 
     private void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
             throws IOException  {
-        Map<String, String> workerProps = baseWorkerProps();
-        workerProps.put("offset.storage.file.filename", "/tmp");
-        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
-        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+        Map<String, String> configMap = baseServerProps();
+        configMap.put("offset.storage.file.filename", "/tmp");
+        configMap.put(RestServerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
 
         doReturn(KAFKA_CLUSTER_ID).when(herder).kafkaClusterId();
         doReturn(plugins).when(herder).plugins();
-        doReturn(Collections.emptyList()).when(plugins).newPlugins(Collections.emptyList(), workerConfig, ConnectRestExtension.class);
+        expectEmptyRestExtensions();
         doReturn(Arrays.asList("a", "b")).when(herder).connectors();
 
-        server = new RestServer(workerConfig, null);
+        server = new ConnectRestServer(null, null, configMap);
         server.initializeServer();
         server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
@@ -442,4 +418,12 @@ public class RestServerTest {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(map);
     }
+
+    private void expectEmptyRestExtensions() {
+        doReturn(Collections.emptyList()).when(plugins).newPlugins(
+                eq(Collections.emptyList()),
+                any(AbstractConfig.class),
+                eq(ConnectRestExtension.class)
+        );
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
new file mode 100644
index 00000000000..28dd725afd4
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerConfigTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class RestServerConfigTest {
+
+    private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+            "add \t Cache-Control: no-cache, no-store, must-revalidate",
+            "add \r X-XSS-Protection: 1; mode=block",
+            "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
+            "AdD   Strict-Transport-Security:  \r  max-age=31536000;  includeSubDomains",
+            "AdD \t Strict-Transport-Security : \n   max-age=31536000;  includeSubDomains",
+            "add X-Content-Type-Options: \r nosniff",
+            "Set \t X-Frame-Options: \t Deny\n ",
+            "seT \t X-Cache-Info: \t not cacheable\n ",
+            "seTDate \t Expires: \r 31540000000",
+            "adDdate \n Last-Modified: \t 0"
+    );
+
+    private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
+            "set \t",
+            "badaction \t X-Frame-Options:DENY",
+            "set add X-XSS-Protection:1",
+            "addX-XSS-Protection",
+            "X-XSS-Protection:",
+            "add set X-XSS-Protection: 1",
+            "add X-XSS-Protection:1 X-XSS-Protection:1 ",
+            "add X-XSS-Protection",
+            "set X-Frame-Options:DENY, add  :no-cache, no-store, must-revalidate "
+    );
+
+    @Test
+    public void testListenersConfigAllowedValues() {
+        Map<String, String> props = new HashMap<>();
+
+        // no value set for "listeners"
+        RestServerConfig config = RestServerConfig.forPublic(null, props);
+        assertEquals(LISTENERS_DEFAULT, config.listeners());
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999");
+        config = RestServerConfig.forPublic(null, props);
+        assertEquals(Arrays.asList("http://a.b:9999"), config.listeners());
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
+        config = RestServerConfig.forPublic(null, props);
+        assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.listeners());
+
+        config = RestServerConfig.forPublic(null, props);
+    }
+
+    @Test
+    public void testListenersConfigNotAllowedValues() {
+        Map<String, String> props = new HashMap<>();
+        assertEquals(LISTENERS_DEFAULT, RestServerConfig.forPublic(null, props).listeners());
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, "");
+        ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(" listeners"));
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, ",,,");
+        ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(" listeners"));
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999,");
+        ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(" listeners"));
+
+        props.put(RestServerConfig.LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
+        ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(" listeners"));
+    }
+
+    @Test
+    public void testAdminListenersConfigAllowedValues() {
+        Map<String, String> props = new HashMap<>();
+
+        // no value set for "admin.listeners"
+        RestServerConfig config = RestServerConfig.forPublic(null, props);
+        assertNull("Default value should be null.", config.adminListeners());
+
+        props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "");
+        config = RestServerConfig.forPublic(null, props);
+        assertTrue(config.adminListeners().isEmpty());
+
+        props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, https://a.b:7812");
+        config = RestServerConfig.forPublic(null, props);
+        assertEquals(Arrays.asList("http://a.b:9999", "https://a.b:7812"), config.adminListeners());
+
+        RestServerConfig.forPublic(null, props);
+    }
+
+    @Test
+    public void testAdminListenersNotAllowingEmptyStrings() {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999,");
+        ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(" admin.listeners"));
+    }
+
+    @Test
+    public void testAdminListenersNotAllowingBlankStrings() {
+        Map<String, String> props = new HashMap<>();
+        props.put(RestServerConfig.ADMIN_LISTENERS_CONFIG, "http://a.b:9999, ,https://a.b:9999");
+        assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+    }
+
+    @Test
+    public void testInvalidHeaderConfigs() {
+        for (String config : INVALID_HEADER_CONFIGS) {
+            assertInvalidHeaderConfig(config);
+        }
+    }
+
+    @Test
+    public void testValidHeaderConfigs() {
+        for (String config : VALID_HEADER_CONFIGS) {
+            assertValidHeaderConfig(config);
+        }
+    }
+
+    private void assertInvalidHeaderConfig(String config) {
+        assertThrows(ConfigException.class, () -> RestServerConfig.validateHttpResponseHeaderConfig(config));
+    }
+
+    private void assertValidHeaderConfig(String config) {
+        RestServerConfig.validateHttpResponseHeaderConfig(config);
+    }
+
+    @Test
+    public void testInvalidSslClientAuthConfig() {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "abc");
+        ConfigException ce = assertThrows(ConfigException.class, () -> RestServerConfig.forPublic(null, props));
+        assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 781a09d72ae..26327770e0c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
-import javax.crypto.Mac;
 import javax.ws.rs.core.HttpHeaders;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -26,12 +25,11 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.RestartRequest;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
-import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
 import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -61,7 +59,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -70,8 +67,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -153,15 +148,15 @@ public class ConnectorsResourceTest {
     private ConnectorsResource connectorsResource;
     private UriInfo forward;
     @Mock
-    private WorkerConfig workerConfig;
-    @Mock
     private RestClient restClient;
+    @Mock
+    private RestServerConfig serverConfig;
 
     @Before
     public void setUp() throws NoSuchMethodException {
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+        when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
         forward = mock(UriInfo.class);
         MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
         queryParams.putSingle("forward", "true");
@@ -565,66 +560,6 @@ public class ConnectorsResourceTest {
         assertThrows(NotFoundException.class, () -> connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD));
     }
 
-    @Test
-    public void testPutConnectorTaskConfigsNoInternalRequestSignature() throws Throwable {
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-        expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
-            eq(CONNECTOR_NAME),
-            eq(TASK_CONFIGS),
-            cb.capture(),
-            any()
-        );
-
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(TASK_CONFIGS));
-    }
-
-    @Test
-    public void testPutConnectorTaskConfigsWithInternalRequestSignature() throws Throwable {
-        final String signatureAlgorithm = "HmacSHA256";
-        final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
-
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-        final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
-        expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
-            eq(CONNECTOR_NAME),
-            eq(TASK_CONFIGS),
-            cb.capture(),
-            signatureCapture.capture()
-        );
-
-        HttpHeaders headers = mock(HttpHeaders.class);
-        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
-            .thenReturn(signatureAlgorithm);
-        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
-            .thenReturn(encodedSignature);
-
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(TASK_CONFIGS));
-
-        InternalRequestSignature expectedSignature = new InternalRequestSignature(
-            serializeAsBytes(TASK_CONFIGS),
-            Mac.getInstance(signatureAlgorithm),
-            Base64.getDecoder().decode(encodedSignature)
-        );
-        assertEquals(
-            expectedSignature,
-            signatureCapture.getValue()
-        );
-    }
-
-    @Test
-    public void testPutConnectorTaskConfigsConnectorNotFound() {
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-        expectAndCallbackException(cb, new NotFoundException("not found")).when(herder).putTaskConfigs(
-            eq(CONNECTOR_NAME),
-            eq(TASK_CONFIGS),
-            cb.capture(),
-            any()
-        );
-
-        assertThrows(NotFoundException.class, () -> connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
-            FORWARD, serializeAsBytes(TASK_CONFIGS)));
-    }
-
     @Test
     public void testRestartConnectorAndTasksConnectorNotFound() {
         RestartRequest restartRequest = new RestartRequest(CONNECTOR_NAME, true, false);
@@ -683,55 +618,6 @@ public class ConnectorsResourceTest {
         assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus());
     }
 
-    @Test
-    public void testFenceZombiesNoInternalRequestSignature() throws Throwable {
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-        expectAndCallbackResult(cb, null)
-            .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), isNull());
-
-        connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null));
-    }
-
-    @Test
-    public void testFenceZombiesWithInternalRequestSignature() throws Throwable {
-        final String signatureAlgorithm = "HmacSHA256";
-        final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
-
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-        final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
-        expectAndCallbackResult(cb, null)
-            .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), signatureCapture.capture());
-
-        HttpHeaders headers = mock(HttpHeaders.class);
-        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
-            .thenReturn(signatureAlgorithm);
-        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
-            .thenReturn(encodedSignature);
-
-        connectorsResource.fenceZombies(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(null));
-
-        InternalRequestSignature expectedSignature = new InternalRequestSignature(
-                serializeAsBytes(null),
-                Mac.getInstance(signatureAlgorithm),
-                Base64.getDecoder().decode(encodedSignature)
-        );
-        assertEquals(
-                expectedSignature,
-                signatureCapture.getValue()
-        );
-    }
-
-    @Test
-    public void testFenceZombiesConnectorNotFound() throws Throwable {
-        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
-
-        expectAndCallbackException(cb, new NotFoundException("not found"))
-            .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any());
-
-        assertThrows(NotFoundException.class,
-                () -> connectorsResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null)));
-    }
-
     @Test
     public void testRestartConnectorNotFound() {
         final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
@@ -806,9 +692,9 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        when(serverConfig.topicTrackingEnabled()).thenReturn(false);
+        when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@@ -817,10 +703,10 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() {
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(false);
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
+        when(serverConfig.topicTrackingEnabled()).thenReturn(false);
+        when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -829,10 +715,10 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() {
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(false);
+        when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+        when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
 
         Exception e = assertThrows(ConnectRestException.class,
             () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -841,11 +727,11 @@ public class ConnectorsResourceTest {
 
     @Test
     public void testConnectorActiveTopics() {
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG)).thenReturn(true);
-        when(workerConfig.getBoolean(TOPIC_TRACKING_ALLOW_RESET_CONFIG)).thenReturn(true);
+        when(serverConfig.topicTrackingEnabled()).thenReturn(true);
+        when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
         when(herder.connectorActiveTopics(CONNECTOR_NAME))
             .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS));
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
 
         Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
         assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@@ -858,7 +744,7 @@ public class ConnectorsResourceTest {
     @Test
     public void testResetConnectorActiveTopics() {
         HttpHeaders headers = mock(HttpHeaders.class);
-        connectorsResource = new ConnectorsResource(herder, workerConfig, restClient);
+        connectorsResource = new ConnectorsResource(herder, serverConfig, restClient);
 
         Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
         verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
new file mode 100644
index 00000000000..5bf33bf5300
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
+import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.util.Callback;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Stubber;
+
+import javax.crypto.Mac;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.UriInfo;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class InternalConnectResourceTest {
+
+    private static final Boolean FORWARD = true;
+    private static final String CONNECTOR_NAME = "test";
+    private static final HttpHeaders NULL_HEADERS = null;
+    private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>();
+    static {
+        TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
+        TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
+    }
+    private static final String FENCE_PATH = "/connectors/" + CONNECTOR_NAME + "/fence";
+    private static final String TASK_CONFIGS_PATH = "/connectors/" + CONNECTOR_NAME + "/tasks";
+
+    @Mock
+    private UriInfo uriInfo;
+    @Mock
+    private Herder herder;
+    @Mock
+    private RestClient restClient;
+
+    private InternalConnectResource internalResource;
+
+    @Before
+    public void setup() {
+        internalResource = new InternalConnectResource(herder, restClient);
+        internalResource.uriInfo = uriInfo;
+    }
+
+    @Test
+    public void testPutConnectorTaskConfigsNoInternalRequestSignature() throws Throwable {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
+                eq(CONNECTOR_NAME),
+                eq(TASK_CONFIGS),
+                cb.capture(),
+                any()
+        );
+        expectRequestPath(TASK_CONFIGS_PATH);
+
+        internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(TASK_CONFIGS));
+    }
+
+    @Test
+    public void testPutConnectorTaskConfigsWithInternalRequestSignature() throws Throwable {
+        final String signatureAlgorithm = "HmacSHA256";
+        final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
+
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+        final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
+        expectAndCallbackResult(cb, null).when(herder).putTaskConfigs(
+                eq(CONNECTOR_NAME),
+                eq(TASK_CONFIGS),
+                cb.capture(),
+                signatureCapture.capture()
+        );
+
+        HttpHeaders headers = mock(HttpHeaders.class);
+        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
+                .thenReturn(signatureAlgorithm);
+        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
+                .thenReturn(encodedSignature);
+        expectRequestPath(TASK_CONFIGS_PATH);
+
+        internalResource.putTaskConfigs(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(TASK_CONFIGS));
+
+        InternalRequestSignature expectedSignature = new InternalRequestSignature(
+                serializeAsBytes(TASK_CONFIGS),
+                Mac.getInstance(signatureAlgorithm),
+                Base64.getDecoder().decode(encodedSignature)
+        );
+        assertEquals(
+                expectedSignature,
+                signatureCapture.getValue()
+        );
+    }
+
+    @Test
+    public void testPutConnectorTaskConfigsConnectorNotFound() {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackException(cb, new NotFoundException("not found")).when(herder).putTaskConfigs(
+                eq(CONNECTOR_NAME),
+                eq(TASK_CONFIGS),
+                cb.capture(),
+                any()
+        );
+        expectRequestPath(TASK_CONFIGS_PATH);
+
+        assertThrows(NotFoundException.class, () -> internalResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS,
+                FORWARD, serializeAsBytes(TASK_CONFIGS)));
+    }
+
+    @Test
+    public void testFenceZombiesNoInternalRequestSignature() throws Throwable {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+        expectAndCallbackResult(cb, null)
+                .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), isNull());
+        expectRequestPath(FENCE_PATH);
+
+        internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null));
+    }
+
+    @Test
+    public void testFenceZombiesWithInternalRequestSignature() throws Throwable {
+        final String signatureAlgorithm = "HmacSHA256";
+        final String encodedSignature = "Kv1/OSsxzdVIwvZ4e30avyRIVrngDfhzVUm/kAZEKc4=";
+
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+        final ArgumentCaptor<InternalRequestSignature> signatureCapture = ArgumentCaptor.forClass(InternalRequestSignature.class);
+        expectAndCallbackResult(cb, null)
+                .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), signatureCapture.capture());
+
+        HttpHeaders headers = mock(HttpHeaders.class);
+        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER))
+                .thenReturn(signatureAlgorithm);
+        when(headers.getHeaderString(InternalRequestSignature.SIGNATURE_HEADER))
+                .thenReturn(encodedSignature);
+        expectRequestPath(FENCE_PATH);
+
+        internalResource.fenceZombies(CONNECTOR_NAME, headers, FORWARD, serializeAsBytes(null));
+
+        InternalRequestSignature expectedSignature = new InternalRequestSignature(
+                serializeAsBytes(null),
+                Mac.getInstance(signatureAlgorithm),
+                Base64.getDecoder().decode(encodedSignature)
+        );
+        assertEquals(
+                expectedSignature,
+                signatureCapture.getValue()
+        );
+    }
+
+    @Test
+    public void testFenceZombiesConnectorNotFound() throws Throwable {
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Callback<Void>> cb = ArgumentCaptor.forClass(Callback.class);
+
+        expectAndCallbackException(cb, new NotFoundException("not found"))
+                .when(herder).fenceZombieSourceTasks(eq(CONNECTOR_NAME), cb.capture(), any());
+        expectRequestPath(FENCE_PATH);
+
+        assertThrows(NotFoundException.class,
+                () -> internalResource.fenceZombies(CONNECTOR_NAME, NULL_HEADERS, FORWARD, serializeAsBytes(null)));
+    }
+
+    private <T> byte[] serializeAsBytes(final T value) throws IOException {
+        return new ObjectMapper().writeValueAsBytes(value);
+    }
+
+    private <T> Stubber expectAndCallbackResult(final ArgumentCaptor<Callback<T>> cb, final T value) {
+        return doAnswer(invocation -> {
+            cb.getValue().onCompletion(null, value);
+            return null;
+        });
+    }
+
+    private <T> Stubber expectAndCallbackException(final ArgumentCaptor<Callback<T>> cb, final Throwable t) {
+        return doAnswer(invocation -> {
+            cb.getValue().onCompletion(t, null);
+            return null;
+        });
+    }
+
+    private void expectRequestPath(String path) {
+        when(uriInfo.getPath()).thenReturn(path);
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
index b8ffbcffbed..8d486bbb9a3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java
@@ -16,11 +16,8 @@
  */
 package org.apache.kafka.connect.runtime.rest.util;
 
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
-import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,17 +28,6 @@ import java.util.Map;
 
 @SuppressWarnings("deprecation")
 public class SSLUtilsTest {
-    private static final Map<String, String> DEFAULT_CONFIG = new HashMap<>();
-    static {
-        // The WorkerConfig base class has some required settings without defaults
-        DEFAULT_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
-        DEFAULT_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
-        DEFAULT_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        DEFAULT_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "connect-test-group");
-        DEFAULT_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
-    }
 
     @Test
     public void testGetOrDefault() {
@@ -58,7 +44,7 @@ public class SSLUtilsTest {
 
     @Test
     public void testCreateServerSideSslContextFactory() {
-        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+        Map<String, String> configMap = new HashMap<>();
         configMap.put("ssl.keystore.location", "/path/to/keystore");
         configMap.put("ssl.keystore.password", "123456");
         configMap.put("ssl.key.password", "123456");
@@ -76,7 +62,7 @@ public class SSLUtilsTest {
         configMap.put("ssl.keymanager.algorithm", "SunX509");
         configMap.put("ssl.trustmanager.algorithm", "PKIX");
 
-        DistributedConfig config = new DistributedConfig(configMap);
+        RestServerConfig config = RestServerConfig.forPublic(null, configMap);
         SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config);
 
         Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath());
@@ -96,7 +82,7 @@ public class SSLUtilsTest {
 
     @Test
     public void testCreateClientSideSslContextFactory() {
-        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
+        Map<String, String> configMap = new HashMap<>();
         configMap.put("ssl.keystore.location", "/path/to/keystore");
         configMap.put("ssl.keystore.password", "123456");
         configMap.put("ssl.key.password", "123456");
@@ -114,7 +100,7 @@ public class SSLUtilsTest {
         configMap.put("ssl.keymanager.algorithm", "SunX509");
         configMap.put("ssl.trustmanager.algorithm", "PKIX");
 
-        DistributedConfig config = new DistributedConfig(configMap);
+        RestServerConfig config = RestServerConfig.forPublic(null, configMap);
         SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config);
 
         Assert.assertEquals("file:///path/to/keystore", ssl.getKeyStorePath());
@@ -134,10 +120,7 @@ public class SSLUtilsTest {
 
     @Test
     public void testCreateServerSideSslContextFactoryDefaultValues() {
-        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
-        configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
-        configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        Map<String, String> configMap = new HashMap<>();
         configMap.put("ssl.keystore.location", "/path/to/keystore");
         configMap.put("ssl.keystore.password", "123456");
         configMap.put("ssl.key.password", "123456");
@@ -147,7 +130,7 @@ public class SSLUtilsTest {
         configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
         configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
 
-        DistributedConfig config = new DistributedConfig(configMap);
+        RestServerConfig config = RestServerConfig.forPublic(null, configMap);
         SslContextFactory ssl = SSLUtils.createServerSideSslContextFactory(config);
 
         Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType());
@@ -162,10 +145,7 @@ public class SSLUtilsTest {
 
     @Test
     public void testCreateClientSideSslContextFactoryDefaultValues() {
-        Map<String, String> configMap = new HashMap<>(DEFAULT_CONFIG);
-        configMap.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "/tmp/offset/file");
-        configMap.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        configMap.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        Map<String, String> configMap = new HashMap<>();
         configMap.put("ssl.keystore.location", "/path/to/keystore");
         configMap.put("ssl.keystore.password", "123456");
         configMap.put("ssl.key.password", "123456");
@@ -175,7 +155,7 @@ public class SSLUtilsTest {
         configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5");
         configMap.put("ssl.secure.random.implementation", "SHA1PRNG");
 
-        DistributedConfig config = new DistributedConfig(configMap);
+        RestServerConfig config = RestServerConfig.forPublic(null, configMap);
         SslContextFactory ssl = SSLUtils.createClientSideSslContextFactory(config);
 
         Assert.assertEquals(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ssl.getKeyStoreType());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 9b91b55e92f..9687fb8c1f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -54,7 +54,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.connect.runtime.WorkerConfig.LISTENERS_CONFIG;
+import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;