You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/07 22:35:41 UTC
[kafka] branch 2.2 updated: KAFKA-8304: Fix registration of Connect
REST extensions (#6651)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push:
new a569a81a KAFKA-8304: Fix registration of Connect REST extensions (#6651)
a569a81a is described below
commit a569a81a6c6cce47261200fe1b0e75085b4e409d
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue May 7 15:20:51 2019 -0700
KAFKA-8304: Fix registration of Connect REST extensions (#6651)
Fix registration of Connect REST extensions to prevent deadlocks when extensions get the list of connectors before the herder is available. Added integration test to check the behavior.
Author: Chris Egerton <ce...@oberlin.edu>
Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
checkstyle/import-control.xml | 1 +
.../kafka/connect/cli/ConnectDistributed.java | 6 +-
.../kafka/connect/cli/ConnectStandalone.java | 6 +-
.../org/apache/kafka/connect/runtime/Connect.java | 1 +
.../kafka/connect/runtime/HerderProvider.java | 68 ------------
.../runtime/health/ConnectClusterStateImpl.java | 12 +--
.../kafka/connect/runtime/rest/RestServer.java | 66 +++++++-----
.../rest/resources/ConnectorPluginsResource.java | 12 +--
.../runtime/rest/resources/ConnectorsResource.java | 39 +++----
.../runtime/rest/resources/RootResource.java | 8 +-
.../integration/RestExtensionIntegrationTest.java | 119 +++++++++++++++++++++
.../health/ConnectClusterStateImplTest.java | 5 +-
.../kafka/connect/runtime/rest/RestServerTest.java | 10 +-
.../resources/ConnectorPluginsResourceTest.java | 3 +-
.../rest/resources/ConnectorsResourceTest.java | 3 +-
.../runtime/rest/resources/RootResourceTest.java | 3 +-
.../util/clusters/EmbeddedConnectCluster.java | 2 +-
17 files changed, 206 insertions(+), 158 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 072d706..f4955ce 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -381,6 +381,7 @@
<allow pkg="org.apache.kafka.connect.util.clusters" />
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.apache.kafka.tools" />
+ <allow pkg="javax.ws.rs" />
</subpackage>
<subpackage name="json">
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index a6c6d98..17d65ac 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -95,8 +94,7 @@ public class ConnectDistributed {
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
- HerderProvider provider = new HerderProvider();
- rest.start(provider, plugins);
+ rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -124,8 +122,6 @@ public class ConnectDistributed {
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
connect.start();
- // herder has initialized now, and ready to be used by the RestServer.
- provider.setHerder(herder);
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index dd1cf0f..499e6df 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerInfo;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -83,8 +82,7 @@ public class ConnectStandalone {
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
- HerderProvider provider = new HerderProvider();
- rest.start(provider, plugins);
+ rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
@@ -97,8 +95,6 @@ public class ConnectStandalone {
try {
connect.start();
- // herder has initialized now, and ready to be used by the RestServer.
- provider.setHerder(herder);
for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 965046c..4a0bcab 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -51,6 +51,7 @@ public class Connect {
Runtime.getRuntime().addShutdownHook(shutdownHook);
herder.start();
+ rest.initializeResources(herder);
log.info("Kafka Connect started");
} finally {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
deleted file mode 100644
index 42c0925..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.connect.errors.ConnectException;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A supplier for {@link Herder}s.
- */
-public class HerderProvider {
-
- private final CountDownLatch initialized = new CountDownLatch(1);
- volatile Herder herder = null;
-
- public HerderProvider() {
- }
-
- /**
- * Create a herder provider with a herder.
- * @param herder the herder that will be supplied to threads waiting on this provider
- */
- public HerderProvider(Herder herder) {
- this.herder = herder;
- initialized.countDown();
- }
-
- /**
- * @return the contained herder.
- * @throws ConnectException if a herder was not available within a duration of calling this method
- */
- public Herder get() {
- try {
- // wait for herder to be initialized
- if (!initialized.await(1, TimeUnit.MINUTES)) {
- throw new ConnectException("Timed out waiting for herder to be initialized.");
- }
- } catch (InterruptedException e) {
- throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
- }
- return herder;
- }
-
- /**
- * @param herder set a herder, and signal to all threads waiting on get().
- */
- public void setHerder(Herder herder) {
- this.herder = herder;
- initialized.countDown();
- }
-
-}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index 32f7add..e3a4833 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.FutureCallback;
@@ -38,17 +38,17 @@ import java.util.concurrent.TimeoutException;
public class ConnectClusterStateImpl implements ConnectClusterState {
private final long herderRequestTimeoutMs;
- private final HerderProvider herderProvider;
+ private final Herder herder;
- public ConnectClusterStateImpl(long connectorsTimeoutMs, HerderProvider herderProvider) {
+ public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) {
this.herderRequestTimeoutMs = connectorsTimeoutMs;
- this.herderProvider = herderProvider;
+ this.herder = herder;
}
@Override
public Collection<String> connectors() {
FutureCallback<Collection<String>> connectorsCallback = new FutureCallback<>();
- herderProvider.get().connectors(connectorsCallback);
+ herder.connectors(connectorsCallback);
try {
return connectorsCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
@@ -59,7 +59,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
@Override
public ConnectorHealth connectorHealth(String connName) {
- ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
+ ConnectorStateInfo state = herder.connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
state.connector().workerId(),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 1840b24..5d60771 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -21,10 +21,9 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
-import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -35,8 +34,8 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLog;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
@@ -75,6 +74,7 @@ public class RestServer {
private static final String PROTOCOL_HTTPS = "https";
private final WorkerConfig config;
+ private ContextHandlerCollection handlers;
private Server jettyServer;
private List<ConnectRestExtension> connectRestExtensions = Collections.emptyList();
@@ -88,6 +88,7 @@ public class RestServer {
List<String> listeners = parseListeners();
jettyServer = new Server();
+ handlers = new ContextHandlerCollection();
createConnectors(listeners);
}
@@ -160,20 +161,40 @@ public class RestServer {
return connector;
}
- public void start(HerderProvider herderProvider, Plugins plugins) {
- log.info("Starting REST server");
+ public void initializeServer() {
+ log.info("Initializing REST server");
+
+ /* Needed for graceful shutdown as per `setStopTimeout` documentation */
+ StatisticsHandler statsHandler = new StatisticsHandler();
+ statsHandler.setHandler(handlers);
+ jettyServer.setHandler(statsHandler);
+ jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
+ jettyServer.setStopAtShutdown(true);
+
+ try {
+ jettyServer.start();
+ } catch (Exception e) {
+ throw new ConnectException("Unable to initialize REST server", e);
+ }
+
+ log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
+ }
+
+ @SuppressWarnings("deprecation")
+ public void initializeResources(Herder herder) {
+ log.info("Initializing REST resources");
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
- resourceConfig.register(new RootResource(herderProvider));
- resourceConfig.register(new ConnectorsResource(herderProvider, config));
- resourceConfig.register(new ConnectorPluginsResource(herderProvider));
+ resourceConfig.register(new RootResource(herder));
+ resourceConfig.register(new ConnectorsResource(herder, config));
+ resourceConfig.register(new ConnectorPluginsResource(herder));
resourceConfig.register(ConnectExceptionMapper.class);
resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
- registerRestExtensions(herderProvider, plugins, resourceConfig);
+ registerRestExtensions(herder, resourceConfig);
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -200,23 +221,14 @@ public class RestServer {
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
- HandlerCollection handlers = new HandlerCollection();
handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
-
- /* Needed for graceful shutdown as per `setStopTimeout` documentation */
- StatisticsHandler statsHandler = new StatisticsHandler();
- statsHandler.setHandler(handlers);
- jettyServer.setHandler(statsHandler);
- jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
- jettyServer.setStopAtShutdown(true);
-
try {
- jettyServer.start();
+ context.start();
} catch (Exception e) {
- throw new ConnectException("Unable to start REST server", e);
+ throw new ConnectException("Unable to initialize REST resources", e);
}
- log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
+ log.info("REST resources initialized; server is started and ready to handle requests");
}
public URI serverUrl() {
@@ -237,9 +249,8 @@ public class RestServer {
jettyServer.stop();
jettyServer.join();
} catch (Exception e) {
- throw new ConnectException("Unable to stop REST server", e);
- } finally {
jettyServer.destroy();
+ throw new ConnectException("Unable to stop REST server", e);
}
log.info("REST server stopped");
@@ -247,7 +258,8 @@ public class RestServer {
/**
* Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
- * server, unless overrides for advertised hostname and/or port are provided via configs.
+ * server, unless overrides for advertised hostname and/or port are provided via configs. {@link #initializeServer()}
+ * must be invoked successfully before calling this method.
*/
public URI advertisedUrl() {
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
@@ -303,8 +315,8 @@ public class RestServer {
return null;
}
- void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) {
- connectRestExtensions = plugins.newPlugins(
+ void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+ connectRestExtensions = herder.plugins().newPlugins(
config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
config, ConnectRestExtension.class);
@@ -319,7 +331,7 @@ public class RestServer {
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
- new ConnectClusterStateImpl(herderRequestTimeoutMs, provider)
+ new ConnectClusterStateImpl(herderRequestTimeoutMs, herder)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 87f25b2..24eb93b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,7 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -49,7 +49,7 @@ import java.util.Map;
public class ConnectorPluginsResource {
private static final String ALIAS_SUFFIX = "Connector";
- private final HerderProvider herderProvider;
+ private final Herder herder;
private final List<ConnectorPluginInfo> connectorPlugins;
private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
@@ -58,8 +58,8 @@ public class ConnectorPluginsResource {
SchemaSourceConnector.class
);
- public ConnectorPluginsResource(HerderProvider herderProvider) {
- this.herderProvider = herderProvider;
+ public ConnectorPluginsResource(Herder herder) {
+ this.herder = herder;
this.connectorPlugins = new ArrayList<>();
}
@@ -78,7 +78,7 @@ public class ConnectorPluginsResource {
);
}
- return herderProvider.get().validateConnectorConfig(connectorConfig);
+ return herder.validateConnectorConfig(connectorConfig);
}
@GET
@@ -90,7 +90,7 @@ public class ConnectorPluginsResource {
// TODO: improve once plugins are allowed to be added/removed during runtime.
private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
if (connectorPlugins.isEmpty()) {
- for (PluginDesc<Connector> plugin : herderProvider.get().plugins().connectors()) {
+ for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
connectorPlugins.add(new ConnectorPluginInfo(plugin));
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index d4d84f1..4a04512 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
@@ -68,25 +67,21 @@ public class ConnectorsResource {
// but currently a worker simply leaving the group can take this long as well.
public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
- private final HerderProvider herderProvider;
+ private final Herder herder;
private final WorkerConfig config;
@javax.ws.rs.core.Context
private ServletContext context;
- public ConnectorsResource(HerderProvider herder, WorkerConfig config) {
- this.herderProvider = herder;
+ public ConnectorsResource(Herder herder, WorkerConfig config) {
+ this.herder = herder;
this.config = config;
}
- private Herder herder() {
- return herderProvider.get();
- }
-
@GET
@Path("/")
public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Collection<String>> cb = new FutureCallback<>();
- herder().connectors(cb);
+ herder.connectors(cb);
return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
}, forward);
}
@@ -104,7 +99,7 @@ public class ConnectorsResource {
checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
- herder().putConnectorConfig(name, configs, false, cb);
+ herder.putConnectorConfig(name, configs, false, cb);
Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
@@ -117,7 +112,7 @@ public class ConnectorsResource {
public ConnectorInfo getConnector(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
- herder().connectorInfo(connector, cb);
+ herder.connectorInfo(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
}
@@ -126,14 +121,14 @@ public class ConnectorsResource {
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
- herder().connectorConfig(connector, cb);
+ herder.connectorConfig(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
}
@GET
@Path("/{connector}/status")
public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
- return herder().connectorStatus(connector);
+ return herder.connectorStatus(connector);
}
@PUT
@@ -144,7 +139,7 @@ public class ConnectorsResource {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
checkAndPutConnectorConfigName(connector, connectorConfig);
- herder().putConnectorConfig(connector, connectorConfig, true, cb);
+ herder.putConnectorConfig(connector, connectorConfig, true, cb);
Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
Response.ResponseBuilder response;
@@ -162,21 +157,21 @@ public class ConnectorsResource {
public void restartConnector(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
- herder().restartConnector(connector, cb);
+ herder.restartConnector(connector, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
}
@PUT
@Path("/{connector}/pause")
public Response pauseConnector(@PathParam("connector") String connector) {
- herder().pauseConnector(connector);
+ herder.pauseConnector(connector);
return Response.accepted().build();
}
@PUT
@Path("/{connector}/resume")
public Response resumeConnector(@PathParam("connector") String connector) {
- herder().resumeConnector(connector);
+ herder.resumeConnector(connector);
return Response.accepted().build();
}
@@ -185,7 +180,7 @@ public class ConnectorsResource {
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
- herder().taskConfigs(connector, cb);
+ herder.taskConfigs(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
}, forward);
}
@@ -196,7 +191,7 @@ public class ConnectorsResource {
final @QueryParam("forward") Boolean forward,
final List<Map<String, String>> taskConfigs) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
- herder().putTaskConfigs(connector, taskConfigs, cb);
+ herder.putTaskConfigs(connector, taskConfigs, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
}
@@ -204,7 +199,7 @@ public class ConnectorsResource {
@Path("/{connector}/tasks/{task}/status")
public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
final @PathParam("task") Integer task) throws Throwable {
- return herder().taskStatus(new ConnectorTaskId(connector, task));
+ return herder.taskStatus(new ConnectorTaskId(connector, task));
}
@POST
@@ -214,7 +209,7 @@ public class ConnectorsResource {
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
- herder().restartTask(taskId, cb);
+ herder.restartTask(taskId, cb);
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
}
@@ -223,7 +218,7 @@ public class ConnectorsResource {
public void destroyConnector(final @PathParam("connector") String connector,
final @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
- herder().deleteConnectorConfig(connector, cb);
+ herder.deleteConnectorConfig(connector, cb);
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 56516cd..9666bf1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;
-import org.apache.kafka.connect.runtime.HerderProvider;
+import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import javax.ws.rs.GET;
@@ -28,15 +28,15 @@ import javax.ws.rs.core.MediaType;
@Produces(MediaType.APPLICATION_JSON)
public class RootResource {
- private final HerderProvider herder;
+ private final Herder herder;
- public RootResource(HerderProvider herder) {
+ public RootResource(Herder herder) {
this.herder = herder;
}
@GET
@Path("/")
public ServerInfo serverInfo() {
- return new ServerInfo(herder.get().kafkaClusterId());
+ return new ServerInfo(herder.kafkaClusterId());
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
new file mode 100644
index 0000000..d4cac39
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+/**
+ * A simple integration test to ensure that REST extensions are registered correctly.
+ */
+@Category(IntegrationTest.class)
+public class RestExtensionIntegrationTest {
+
+ private static final int NUM_WORKERS = 3;
+ private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
+
+ private EmbeddedConnectCluster connect;
+
+ @Test
+ public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException {
+ // setup Connect worker properties
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
+
+ // build a Connect cluster backed by Kafka and Zk
+ connect = new EmbeddedConnectCluster.Builder()
+ .name("connect-cluster")
+ .numWorkers(NUM_WORKERS)
+ .numBrokers(1)
+ .workerProps(workerProps)
+ .build();
+
+ // start the clusters
+ connect.start();
+
+ waitForCondition(
+ this::extensionIsRegistered,
+ REST_EXTENSION_REGISTRATION_TIMEOUT_MS,
+ "REST extension was never registered"
+ );
+ }
+
+ @After
+ public void close() {
+ // stop all Connect, Kafka and Zk threads.
+ connect.stop();
+ }
+
+ private boolean extensionIsRegistered() {
+ try {
+ String extensionUrl = connect.endpointForResource("integration-test-rest-extension/registered");
+ return "true".equals(connect.executeGet(extensionUrl));
+ } catch (ConnectRestException | IOException e) {
+ return false;
+ }
+ }
+
+ public static class IntegrationTestRestExtension implements ConnectRestExtension {
+
+ @Override
+ public void register(ConnectRestExtensionContext restPluginContext) {
+ restPluginContext.clusterState().connectors();
+ restPluginContext.configurable().register(new IntegrationTestRestExtensionResource());
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ }
+
+ @Override
+ public String version() {
+ return "test";
+ }
+
+ @Path("integration-test-rest-extension")
+ public static class IntegrationTestRestExtensionResource {
+
+ @GET
+ @Path("/registered")
+ public boolean isRegistered() {
+ return true;
+ }
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index b232a4d..78780f3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.health;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
@@ -41,15 +40,13 @@ public class ConnectClusterStateImplTest {
@Mock
protected Herder herder;
- protected HerderProvider herderProvider;
protected ConnectClusterStateImpl connectClusterState;
protected Collection<String> expectedConnectors;
protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
@Before
public void setUp() {
- herderProvider = new HerderProvider(herder);
- connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herderProvider);
+ connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder);
expectedConnectors = Arrays.asList("sink1", "source1", "source2");
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 128532f..3d297b7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -26,7 +26,6 @@ import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -175,7 +174,8 @@ public class RestServerTest {
PowerMock.replayAll();
server = new RestServer(workerConfig);
- server.start(new HerderProvider(herder), herder.plugins());
+ server.initializeServer();
+ server.initializeResources(herder);
HttpOptions request = new HttpOptions("/connectors");
request.addHeader("Content-Type", MediaType.WILDCARD);
@@ -218,7 +218,8 @@ public class RestServerTest {
PowerMock.replayAll();
server = new RestServer(workerConfig);
- server.start(new HerderProvider(herder), herder.plugins());
+ server.initializeServer();
+ server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
request.addHeader("Referer", origin + "/page");
request.addHeader("Origin", origin);
@@ -275,7 +276,8 @@ public class RestServerTest {
PowerMock.replayAll();
server = new RestServer(workerConfig);
- server.start(new HerderProvider(herder), herder.plugins());
+ server.initializeServer();
+ server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
CloseableHttpClient httpClient = HttpClients.createMinimal();
HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 684064d..a3aee6a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.TestSinkConnector;
import org.apache.kafka.connect.runtime.TestSourceConnector;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -187,7 +186,7 @@ public class ConnectorPluginsResourceTest {
plugins = PowerMock.createMock(Plugins.class);
herder = PowerMock.createMock(AbstractHerder.class);
- connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder));
+ connectorPluginsResource = new ConnectorPluginsResource(herder);
}
private void expectPlugins() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 5a52074..f84cd25 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -127,7 +126,7 @@ public class ConnectorsResourceTest {
public void setUp() throws NoSuchMethodException {
PowerMock.mockStatic(RestClient.class,
RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
- connectorsResource = new ConnectorsResource(new HerderProvider(herder), null);
+ connectorsResource = new ConnectorsResource(herder, null);
}
private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index be80e28..4e928a3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.HerderProvider;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
@@ -40,7 +39,7 @@ public class RootResourceTest extends EasyMockSupport {
@Before
public void setUp() {
- rootResource = new RootResource(new HerderProvider(herder));
+ rootResource = new RootResource(herder);
}
@Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 590649b..e610812 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -333,7 +333,7 @@ public class EmbeddedConnectCluster {
}
}
- private String endpointForResource(String resource) throws IOException {
+ public String endpointForResource(String resource) throws IOException {
String url = connectCluster.stream()
.map(WorkerHandle::url)
.filter(Objects::nonNull)