You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/07 22:35:41 UTC

[kafka] branch 2.2 updated: KAFKA-8304: Fix registration of Connect REST extensions (#6651)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new a569a81a KAFKA-8304: Fix registration of Connect REST extensions (#6651)
a569a81a is described below

commit a569a81a6c6cce47261200fe1b0e75085b4e409d
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue May 7 15:20:51 2019 -0700

    KAFKA-8304: Fix registration of Connect REST extensions (#6651)
    
    Fix registration of Connect REST extensions to prevent deadlocks when extensions get the list of connectors before the herder is available. Added integration test to check the behavior.
    
    Author: Chris Egerton <ce...@oberlin.edu>
    Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../kafka/connect/cli/ConnectDistributed.java      |   6 +-
 .../kafka/connect/cli/ConnectStandalone.java       |   6 +-
 .../org/apache/kafka/connect/runtime/Connect.java  |   1 +
 .../kafka/connect/runtime/HerderProvider.java      |  68 ------------
 .../runtime/health/ConnectClusterStateImpl.java    |  12 +--
 .../kafka/connect/runtime/rest/RestServer.java     |  66 +++++++-----
 .../rest/resources/ConnectorPluginsResource.java   |  12 +--
 .../runtime/rest/resources/ConnectorsResource.java |  39 +++----
 .../runtime/rest/resources/RootResource.java       |   8 +-
 .../integration/RestExtensionIntegrationTest.java  | 119 +++++++++++++++++++++
 .../health/ConnectClusterStateImplTest.java        |   5 +-
 .../kafka/connect/runtime/rest/RestServerTest.java |  10 +-
 .../resources/ConnectorPluginsResourceTest.java    |   3 +-
 .../rest/resources/ConnectorsResourceTest.java     |   3 +-
 .../runtime/rest/resources/RootResourceTest.java   |   3 +-
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +-
 17 files changed, 206 insertions(+), 158 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 072d706..f4955ce 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -381,6 +381,7 @@
       <allow pkg="org.apache.kafka.connect.util.clusters" />
       <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.apache.kafka.tools" />
+      <allow pkg="javax.ws.rs" />
     </subpackage>
 
     <subpackage name="json">
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index a6c6d98..17d65ac 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -95,8 +94,7 @@ public class ConnectDistributed {
         log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
-        HerderProvider provider = new HerderProvider();
-        rest.start(provider, plugins);
+        rest.initializeServer();
 
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -124,8 +122,6 @@ public class ConnectDistributed {
         log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
         try {
             connect.start();
-            // herder has initialized now, and ready to be used by the RestServer.
-            provider.setHerder(herder);
         } catch (Exception e) {
             log.error("Failed to start Connect", e);
             connect.stop();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index dd1cf0f..499e6df 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerInfo;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -83,8 +82,7 @@ public class ConnectStandalone {
             log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
             RestServer rest = new RestServer(config);
-            HerderProvider provider = new HerderProvider();
-            rest.start(provider, plugins);
+            rest.initializeServer();
 
             URI advertisedUrl = rest.advertisedUrl();
             String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -97,8 +95,6 @@ public class ConnectStandalone {
 
             try {
                 connect.start();
-                // herder has initialized now, and ready to be used by the RestServer.
-                provider.setHerder(herder);
                 for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
                     Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
                     FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 965046c..4a0bcab 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -51,6 +51,7 @@ public class Connect {
             Runtime.getRuntime().addShutdownHook(shutdownHook);
 
             herder.start();
+            rest.initializeResources(herder);
 
             log.info("Kafka Connect started");
         } finally {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
deleted file mode 100644
index 42c0925..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.connect.errors.ConnectException;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A supplier for {@link Herder}s.
- */
-public class HerderProvider {
-
-    private final CountDownLatch initialized = new CountDownLatch(1);
-    volatile Herder herder = null;
-
-    public HerderProvider() {
-    }
-
-    /**
-     * Create a herder provider with a herder.
-     * @param herder the herder that will be supplied to threads waiting on this provider
-     */
-    public HerderProvider(Herder herder) {
-        this.herder = herder;
-        initialized.countDown();
-    }
-
-    /**
-     * @return the contained herder.
-     * @throws ConnectException if a herder was not available within a duration of calling this method
-     */
-    public Herder get() {
-        try {
-            // wait for herder to be initialized
-            if (!initialized.await(1, TimeUnit.MINUTES)) {
-                throw new ConnectException("Timed out waiting for herder to be initialized.");
-            }
-        } catch (InterruptedException e) {
-            throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
-        }
-        return herder;
-    }
-
-    /**
-     * @param herder set a herder, and signal to all threads waiting on get().
-     */
-    public void setHerder(Herder herder) {
-        this.herder = herder;
-        initialized.countDown();
-    }
-
-}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index 32f7add..e3a4833 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.connect.health.ConnectorHealth;
 import org.apache.kafka.connect.health.ConnectorState;
 import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.health.TaskState;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.util.FutureCallback;
 
@@ -38,17 +38,17 @@ import java.util.concurrent.TimeoutException;
 public class ConnectClusterStateImpl implements ConnectClusterState {
     
     private final long herderRequestTimeoutMs;
-    private final HerderProvider herderProvider;
+    private final Herder herder;
 
-    public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) {
+    public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) {
         this.herderRequestTimeoutMs = connectorsTimeoutMs;
-        this.herderProvider = herderProvider;
+        this.herder = herder;
     }
 
     @Override
     public Collection<String> connectors() {
         FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>();
-        herderProvider.get().connectors(connectorsCallback);
+        herder.connectors(connectorsCallback);
         try {
             return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -59,7 +59,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
     @Override
     public ConnectorHealth connectorHealth(String connName) {
 
-        ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
+        ConnectorStateInfo state = herder.connectorStatus(connName);
         ConnectorState connectorState = new ConnectorState(
             state.connector().state(),
             state.connector().workerId(),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 1840b24..5d60771 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -21,10 +21,9 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
-import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -35,8 +34,8 @@ import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.Slf4jRequestLog;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
 import org.eclipse.jetty.server.handler.DefaultHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
 import org.eclipse.jetty.servlet.FilterHolder;
@@ -75,6 +74,7 @@ public class RestServer {
     private static final String PROTOCOL_HTTPS = "https";
 
     private final WorkerConfig config;
+    private ContextHandlerCollection handlers;
     private Server jettyServer;
 
     private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList();
@@ -88,6 +88,7 @@ public class RestServer {
         List<String> listeners = parseListeners();
 
         jettyServer = new Server();
+        handlers = new ContextHandlerCollection();
 
         createConnectors(listeners);
     }
@@ -160,20 +161,40 @@ public class RestServer {
         return connector;
     }
 
-    public void start(HerderProvider herderProvider, Plugins plugins) {
-        log.info("Starting REST server");
+    public void initializeServer() {
+        log.info("Initializing REST server");
+
+        /* Needed for graceful shutdown as per `setStopTimeout` documentation */
+        StatisticsHandler statsHandler = new StatisticsHandler();
+        statsHandler.setHandler(handlers);
+        jettyServer.setHandler(statsHandler);
+        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
+        jettyServer.setStopAtShutdown(true);
+
+        try {
+            jettyServer.start();
+        } catch (Exception e) {
+            throw new ConnectException("Unable to initialize REST server", e);
+        }
+
+        log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
+    }
+
+    @SuppressWarnings("deprecation")
+    public void initializeResources(Herder herder) {
+        log.info("Initializing REST resources");
 
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(new RootResource(herderProvider));
-        resourceConfig.register(new ConnectorsResource(herderProvider, config));
-        resourceConfig.register(new ConnectorPluginsResource(herderProvider));
+        resourceConfig.register(new RootResource(herder));
+        resourceConfig.register(new ConnectorsResource(herder, config));
+        resourceConfig.register(new ConnectorPluginsResource(herder));
 
         resourceConfig.register(ConnectExceptionMapper.class);
         resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
 
-        registerRestExtensions(herderProvider, plugins, resourceConfig);
+        registerRestExtensions(herder, resourceConfig);
 
         ServletContainer servletContainer = new ServletContainer(resourceConfig);
         ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -200,23 +221,14 @@ public class RestServer {
         requestLog.setLogLatency(true);
         requestLogHandler.setRequestLog(requestLog);
 
-        HandlerCollection handlers = new HandlerCollection();
         handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
-
-        /* Needed for graceful shutdown as per `setStopTimeout` documentation */
-        StatisticsHandler statsHandler = new StatisticsHandler();
-        statsHandler.setHandler(handlers);
-        jettyServer.setHandler(statsHandler);
-        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
-        jettyServer.setStopAtShutdown(true);
-
         try {
-            jettyServer.start();
+            context.start();
         } catch (Exception e) {
-            throw new ConnectException("Unable to start REST server", e);
+            throw new ConnectException("Unable to initialize REST resources", e);
         }
 
-        log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
+        log.info("REST resources initialized; server is started and ready to handle requests");
     }
 
     public URI serverUrl() {
@@ -237,9 +249,8 @@ public class RestServer {
             jettyServer.stop();
             jettyServer.join();
         } catch (Exception e) {
-            throw new ConnectException("Unable to stop REST server", e);
-        } finally {
             jettyServer.destroy();
+            throw new ConnectException("Unable to stop REST server", e);
         }
 
         log.info("REST server stopped");
@@ -247,7 +258,8 @@ public class RestServer {
 
     /**
      * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
-     * server, unless overrides for advertised hostname and/or port are provided via configs.
+     * server, unless overrides for advertised hostname and/or port are provided via configs. {@link #initializeServer()}
+     * must be invoked successfully before calling this method.
      */
     public URI advertisedUrl() {
         UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
@@ -303,8 +315,8 @@ public class RestServer {
         return null;
     }
 
-    void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) {
-        connectRestExtensions = plugins.newPlugins(
+    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+        connectRestExtensions = herder.plugins().newPlugins(
             config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
             config, ConnectRestExtension.class);
 
@@ -319,7 +331,7 @@ public class RestServer {
         ConnectRestExtensionContext connectRestExtensionContext =
             new ConnectRestExtensionContextImpl(
                 new ConnectRestConfigurable(resourceConfig),
-                new ConnectClusterStateImpl(herderRequestTimeoutMs, provider)
+                new ConnectClusterStateImpl(herderRequestTimeoutMs, herder)
             );
         for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
             connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 87f25b2..24eb93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,7 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -49,7 +49,7 @@ import java.util.Map;
 public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
-    private final HerderProvider herderProvider;
+    private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
 
     private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
@@ -58,8 +58,8 @@ public class ConnectorPluginsResource {
             SchemaSourceConnector.class
     );
 
-    public ConnectorPluginsResource(HerderProvider herderProvider) {
-        this.herderProvider = herderProvider;
+    public ConnectorPluginsResource(Herder herder) {
+        this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
     }
 
@@ -78,7 +78,7 @@ public class ConnectorPluginsResource {
             );
         }
 
-        return herderProvider.get().validateConnectorConfig(connectorConfig);
+        return herder.validateConnectorConfig(connectorConfig);
     }
 
     @GET
@@ -90,7 +90,7 @@ public class ConnectorPluginsResource {
     // TODO: improve once plugins are allowed to be added/removed during runtime.
     private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
         if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herderProvider.get().plugins().connectors()) {
+            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
                 if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
                     connectorPlugins.add(new ConnectorPluginInfo(plugin));
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index d4d84f1..4a04512 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
 import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
@@ -68,25 +67,21 @@ public class ConnectorsResource {
     // but currently a worker simply leaving the group can take this long as well.
     public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
 
-    private final HerderProvider herderProvider;
+    private final Herder herder;
     private final WorkerConfig config;
     @javax.ws.rs.core.Context
     private ServletContext context;
 
-    public ConnectorsResource(HerderProvider herder, WorkerConfig config) {
-        this.herderProvider = herder;
+    public ConnectorsResource(Herder herder, WorkerConfig config) {
+        this.herder = herder;
         this.config = config;
     }
 
-    private Herder herder() {
-        return herderProvider.get();
-    }
-
     @GET
     @Path("/")
     public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
-        herder().connectors(cb);
+        herder.connectors(cb);
         return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
         }, forward);
     }
@@ -104,7 +99,7 @@ public class ConnectorsResource {
         checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder().putConnectorConfig(name, configs, false, cb);
+        herder.putConnectorConfig(name, configs, false, cb);
         Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
 
@@ -117,7 +112,7 @@ public class ConnectorsResource {
     public ConnectorInfo getConnector(final @PathParam("connector") String connector,
                                       final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
-        herder().connectorInfo(connector, cb);
+        herder.connectorInfo(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
     }
 
@@ -126,14 +121,14 @@ public class ConnectorsResource {
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
                                                   final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
-        herder().connectorConfig(connector, cb);
+        herder.connectorConfig(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
     }
 
     @GET
     @Path("/{connector}/status")
     public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
-        return herder().connectorStatus(connector);
+        return herder.connectorStatus(connector);
     }
 
     @PUT
@@ -144,7 +139,7 @@ public class ConnectorsResource {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         checkAndPutConnectorConfigName(connector, connectorConfig);
 
-        herder().putConnectorConfig(connector, connectorConfig, true, cb);
+        herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
                 "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
@@ -162,21 +157,21 @@ public class ConnectorsResource {
     public void restartConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder().restartConnector(connector, cb);
+        herder.restartConnector(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
     public Response pauseConnector(@PathParam("connector") String connector) {
-        herder().pauseConnector(connector);
+        herder.pauseConnector(connector);
         return Response.accepted().build();
     }
 
     @PUT
     @Path("/{connector}/resume")
     public Response resumeConnector(@PathParam("connector") String connector) {
-        herder().resumeConnector(connector);
+        herder.resumeConnector(connector);
         return Response.accepted().build();
     }
 
@@ -185,7 +180,7 @@ public class ConnectorsResource {
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
                                          final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
-        herder().taskConfigs(connector, cb);
+        herder.taskConfigs(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
@@ -196,7 +191,7 @@ public class ConnectorsResource {
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder().putTaskConfigs(connector, taskConfigs, cb);
+        herder.putTaskConfigs(connector, taskConfigs, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
     }
 
@@ -204,7 +199,7 @@ public class ConnectorsResource {
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
                                                       final @PathParam("task") Integer task) throws Throwable {
-        return herder().taskStatus(new ConnectorTaskId(connector, task));
+        return herder.taskStatus(new ConnectorTaskId(connector, task));
     }
 
     @POST
@@ -214,7 +209,7 @@ public class ConnectorsResource {
                             final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
-        herder().restartTask(taskId, cb);
+        herder.restartTask(taskId, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
     }
 
@@ -223,7 +218,7 @@ public class ConnectorsResource {
     public void destroyConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder().deleteConnectorConfig(connector, cb);
+        herder.deleteConnectorConfig(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 56516cd..9666bf1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
 import javax.ws.rs.GET;
@@ -28,15 +28,15 @@ import javax.ws.rs.core.MediaType;
 @Produces(MediaType.APPLICATION_JSON)
 public class RootResource {
 
-    private final HerderProvider herder;
+    private final Herder herder;
 
-    public RootResource(HerderProvider herder) {
+    public RootResource(Herder herder) {
         this.herder = herder;
     }
 
     @GET
     @Path("/")
     public ServerInfo serverInfo() {
-        return new ServerInfo(herder.get().kafkaClusterId());
+        return new ServerInfo(herder.kafkaClusterId());
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
new file mode 100644
index 0000000..d4cac39
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+/**
+ * A simple integration test to ensure that REST extensions are registered correctly.
+ */
+@Category(IntegrationTest.class)
+public class RestExtensionIntegrationTest {
+
+    private static final int NUM_WORKERS = 3;
+    private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
+
+    private EmbeddedConnectCluster connect;
+
+    @Test
+    public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException {
+        // setup Connect worker properties
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
+  
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(NUM_WORKERS)
+                .numBrokers(1)
+                .workerProps(workerProps)
+                .build();
+  
+        // start the clusters
+        connect.start();
+
+        waitForCondition(
+            this::extensionIsRegistered,
+            REST_EXTENSION_REGISTRATION_TIMEOUT_MS,
+            "REST extension was never registered"
+        );
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    private boolean extensionIsRegistered() {
+        try {
+            String extensionUrl = connect.endpointForResource("integration-test-rest-extension/registered");
+            return "true".equals(connect.executeGet(extensionUrl));
+        } catch (ConnectRestException | IOException e) {
+            return false;
+        }
+    }
+
+    public static class IntegrationTestRestExtension implements ConnectRestExtension {
+
+        @Override
+        public void register(ConnectRestExtensionContext restPluginContext) {
+            restPluginContext.clusterState().connectors();
+            restPluginContext.configurable().register(new IntegrationTestRestExtensionResource());
+        }
+    
+        @Override
+        public void close() {
+        }
+    
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    
+        @Override
+        public String version() {
+            return "test";
+        }
+
+        @Path("integration-test-rest-extension")
+        public static class IntegrationTestRestExtensionResource {
+
+            @GET
+            @Path("/registered")
+            public boolean isRegistered() {
+                return true;
+            }
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index b232a4d..78780f3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.health;
 
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -41,15 +40,13 @@ public class ConnectClusterStateImplTest {
   
     @Mock
     protected Herder herder;
-    protected HerderProvider herderProvider;
     protected ConnectClusterStateImpl connectClusterState;
     protected Collection<String> expectedConnectors;
     protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
     
     @Before
     public void setUp() {
-        herderProvider = new HerderProvider(herder);
-        connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider);
+        connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder);
         expectedConnectors = Arrays.asList("sink1", "source1", "source2");
     }
     
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 128532f..3d297b7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -26,7 +26,6 @@ import org.apache.http.impl.client.HttpClients;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -175,7 +174,8 @@ public class RestServerTest {
         PowerMock.replayAll();
 
         server = new RestServer(workerConfig);
-        server.start(new HerderProvider(herder), herder.plugins());
+        server.initializeServer();
+        server.initializeResources(herder);
 
         HttpOptions request = new HttpOptions("/connectors");
         request.addHeader("Content-Type", MediaType.WILDCARD);
@@ -218,7 +218,8 @@ public class RestServerTest {
         PowerMock.replayAll();
 
         server = new RestServer(workerConfig);
-        server.start(new HerderProvider(herder), herder.plugins());
+        server.initializeServer();
+        server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
         request.addHeader("Referer", origin + "/page");
         request.addHeader("Origin", origin);
@@ -275,7 +276,8 @@ public class RestServerTest {
         PowerMock.replayAll();
 
         server = new RestServer(workerConfig);
-        server.start(new HerderProvider(herder), herder.plugins());
+        server.initializeServer();
+        server.initializeResources(herder);
         HttpRequest request = new HttpGet("/connectors");
         CloseableHttpClient httpClient = HttpClients.createMinimal();
         HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 684064d..a3aee6a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.TestSinkConnector;
 import org.apache.kafka.connect.runtime.TestSourceConnector;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -187,7 +186,7 @@ public class ConnectorPluginsResourceTest {
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
-        connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder));
+        connectorPluginsResource = new ConnectorPluginsResource(herder);
     }
 
     private void expectPlugins() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 5a52074..f84cd25 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -127,7 +126,7 @@ public class ConnectorsResourceTest {
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
                 RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
-        connectorsResource = new ConnectorsResource(new HerderProvider(herder), null);
+        connectorsResource = new ConnectorsResource(herder, null);
     }
 
     private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index be80e28..4e928a3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -40,7 +39,7 @@ public class RootResourceTest extends EasyMockSupport {
 
     @Before
     public void setUp() {
-        rootResource = new RootResource(new HerderProvider(herder));
+        rootResource = new RootResource(herder);
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 590649b..e610812 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -333,7 +333,7 @@ public class EmbeddedConnectCluster {
         }
     }
 
-    private String endpointForResource(String resource) throws IOException {
+    public String endpointForResource(String resource) throws IOException {
         String url = connectCluster.stream()
                 .map(WorkerHandle::url)
                 .filter(Objects::nonNull)