You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/30 04:38:00 UTC

[jira] [Commented] (KAFKA-6776) Connect Rest Extension Plugin

    [ https://issues.apache.org/jira/browse/KAFKA-6776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494673#comment-16494673 ] 

ASF GitHub Bot commented on KAFKA-6776:
---------------------------------------

ewencp closed pull request #4931: KAFKA-6776 : ConnectRestExtension Interfaces & Rest integration
URL: https://github.com/apache/kafka/pull/4931
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 474329a273d..4f3fd770b78 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1207,6 +1207,7 @@ project(':connect:api') {
   dependencies {
     compile project(':clients')
     compile libs.slf4jApi
+    compile libs.jerseyContainerServlet
 
     testCompile libs.junit
 
@@ -1431,6 +1432,45 @@ project(':connect:file') {
   }
 }
 
+project(':connect:basic-auth-extension') {
+  archivesBaseName = "connect-basic-auth-extension"
+
+  dependencies {
+    compile project(':connect:api')
+    compile libs.slf4jApi
+
+    testCompile libs.bcpkix
+    testCompile libs.easymock
+    testCompile libs.junit
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
+    testCompile project(':clients').sourceSets.test.output
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+      include('log4j*jar')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e4f9a4e8f59..5549205bd13 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,7 @@
     <allow pkg="org.apache.kafka.connect.data" />
     <allow pkg="org.apache.kafka.connect.errors" />
     <allow pkg="org.apache.kafka.connect.header" />
+    <allow pkg="org.apache.kafka.connect.components"/>
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.test"/>
 
@@ -307,7 +308,16 @@
     <subpackage name="converters">
       <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
-    
+
+    <subpackage name="rest">
+      <allow pkg="org.apache.kafka.connect.health" />
+      <allow pkg="javax.ws.rs" />
+      <allow pkg= "javax.security.auth"/>
+      <subpackage name="basic">
+        <allow pkg="org.apache.kafka.connect.rest"/>
+      </subpackage>
+    </subpackage>
+
     <subpackage name="runtime">
       <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.reflections"/>
@@ -327,6 +337,8 @@
       </subpackage>
     </subpackage>
 
+
+
     <subpackage name="cli">
       <allow pkg="org.apache.kafka.connect.runtime" />
       <allow pkg="org.apache.kafka.connect.storage" />
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
new file mode 100644
index 00000000000..adabe8fbce1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.components;
+
+/**
+ * Connect requires some components implement this interface to define a version string.
+ */
+public interface Versioned {
+    /**
+     * Get the version of this component.
+     *
+     * @return the version, formatted as a String. The version may not be (@code null} or empty.
+     */
+    String version();
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 30dfd3cc9e0..329429b4e93 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.components.Versioned;
 
 import java.util.List;
 import java.util.Map;
@@ -41,16 +42,10 @@
  * Tasks.
  * </p>
  */
-public abstract class Connector {
+public abstract class Connector implements Versioned {
 
     protected ConnectorContext context;
 
-    /**
-     * Get the version of this connector.
-     *
-     * @return the version, formatted as a String
-     */
-    public abstract String version();
 
     /**
      * Initialize this connector, using the provided ConnectorContext to notify the runtime of
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
new file mode 100644
index 00000000000..a18c46334cf
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Provides the current status along with identifier for Connect worker and tasks.
+ */
+public abstract class AbstractState {
+
+    private final String state;
+    private final String traceMessage;
+    private final String workerId;
+
+    /**
+     * Construct a state for connector or task.
+     *
+     * @param state  the status of connector or task; may not be null or empty
+     * @param workerId  the workerId associated with the connector or the task; may not be null or empty
+     * @param traceMessage  any error trace message associated with the connector or the task; may be null or empty
+     */
+    public AbstractState(String state, String workerId, String traceMessage) {
+        if (state != null && !state.trim().isEmpty()) {
+            throw new IllegalArgumentException("State must not be null or empty");
+        }
+        if (workerId != null && !workerId.trim().isEmpty()) {
+            throw new IllegalArgumentException("Worker ID must not be null or empty");
+        }
+        this.state = state;
+        this.workerId = workerId;
+        this.traceMessage = traceMessage;
+    }
+
+    /**
+     * Provides the current state of the connector or task.
+     *
+     * @return state, never {@code null} or empty
+     */
+    public String state() {
+        return state;
+    }
+
+    /**
+     * The identifier of the worker associated with the connector or the task.
+     *
+     * @return workerId, never {@code null} or empty.
+     */
+    public String workerId() {
+        return workerId;
+    }
+
+    /**
+     * The error message associated with the connector or task.
+     *
+     * @return traceMessage, can be {@code null} or empty.
+     */
+    public String traceMessage() {
+        return traceMessage;
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
new file mode 100644
index 00000000000..d4292efd0ca
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Collection;
+
+/**
+ * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
+ * implementations. The Connect framework provides the implementation for this interface.
+ */
+public interface ConnectClusterState {
+
+    /**
+     * Get the names of the connectors currently deployed in this cluster. This is a full list of connectors in the cluster gathered from
+     * the current configuration, which may change over time.
+     *
+     * @return collection of connector names, never {@code null}
+     */
+    Collection<String> connectors();
+
+    /**
+     * Lookup the current health of a connector and its tasks. This provides the current snapshot of health by querying the underlying
+     * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
+     * org.apache.kafka.connect.errors.NotFoundException}.
+     *
+     * @param connName name of the connector
+     * @return the health of the connector for the connector name
+     * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
+     */
+    ConnectorHealth connectorHealth(String connName);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
new file mode 100644
index 00000000000..3a9efd15372
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Provides basic health information about the connector and its tasks.
+ */
+public class ConnectorHealth {
+
+    private final String name;
+    private final ConnectorState connectorState;
+    private final Map<Integer, TaskState> tasks;
+    private final ConnectorType type;
+
+
+    public ConnectorHealth(String name,
+                           ConnectorState connectorState,
+                           Map<Integer, TaskState> tasks,
+                           ConnectorType type) {
+        if (name != null && !name.trim().isEmpty()) {
+            throw new IllegalArgumentException("Connector name is required");
+        }
+        Objects.requireNonNull(connectorState, "connectorState can't be null");
+        Objects.requireNonNull(tasks, "tasks can't be null");
+        Objects.requireNonNull(type, "type can't be null");
+        this.name = name;
+        this.connectorState = connectorState;
+        this.tasks = tasks;
+        this.type = type;
+    }
+
+    /**
+     * Provides the name of the connector.
+     *
+     * @return name, never {@code null} or empty
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Provides the current state of the connector.
+     *
+     * @return the connector state, never {@code null}
+     */
+    public ConnectorState connectorState() {
+        return connectorState;
+    }
+
+    /**
+     * Provides the current state of the connector tasks.
+     *
+     * @return the state for each task ID; never {@code null}
+     */
+    public Map<Integer, TaskState> tasksState() {
+        return tasks;
+    }
+
+    /**
+     * Provides the type of the connector.
+     *
+     * @return type, never {@code null}
+     */
+    public ConnectorType type() {
+        return type;
+    }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
new file mode 100644
index 00000000000..d5571bc4ff5
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Describes the status, worker ID, and any errors associated with a connector.
+ */
+public class ConnectorState extends AbstractState {
+
+    /**
+     * Provides an instance of the ConnectorState.
+     *
+     * @param state - the status of connector, may not be {@code null} or empty
+     * @param workerId - the workerId associated with the connector, may not be {@code null} or empty
+     * @param traceMessage - any error message associated with the connector, may be {@code null} or empty
+     */
+    public ConnectorState(String state, String workerId, String traceMessage) {
+        super(state, workerId, traceMessage);
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
new file mode 100644
index 00000000000..fa9db6f6b60
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+import java.util.Locale;
+
+/**
+ * Enum definition that identifies the type of the connector.
+ */
+public enum ConnectorType {
+    /**
+     * Identifies a source connector
+     */
+    SOURCE,
+    /**
+     * Identifies a sink connector
+     */
+    SINK,
+    /**
+     * Identifies a connector whose type could not be inferred
+     */
+    UNKNOWN;
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);
+    }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
new file mode 100644
index 00000000000..1c1be159970
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Objects;
+
+/**
+ * Describes the state, IDs, and any errors of a connector task.
+ */
+public class TaskState extends AbstractState {
+
+    private final int taskId;
+
+    /**
+     * Provides an instance of {@link TaskState}.
+     *
+     * @param taskId   the id associated with the connector task
+     * @param state    the status of the task, may not be {@code null} or empty
+     * @param workerId id of the worker the task is associated with, may not be {@code null} or empty
+     * @param trace    error message if that task had failed or errored out, may be {@code null} or empty
+     */
+    public TaskState(int taskId, String state, String workerId, String trace) {
+        super(state, workerId, trace);
+        this.taskId = taskId;
+    }
+
+    /**
+     * Provides the ID of the task.
+     *
+     * @return the task ID
+     */
+    public int taskId() {
+        return taskId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TaskState taskState = (TaskState) o;
+
+        return taskId == taskState.taskId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(taskId);
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
new file mode 100644
index 00000000000..aa479a3d449
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will
+ * be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by  Connect's plugin class loading mechanism.
+ *
+ * <p>The extension class(es) must be packaged as a plugin, with one JAR containing the implementation classes and a {@code
+ * META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} file that contains the fully qualified name of the
+ * class(es) that implement the ConnectRestExtension interface. The plugin should also include the JARs of all dependencies except those
+ * already provided by the Connect framework.
+ *
+ * <p>To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is
+ * on Connect's {@code plugin.path}, and (re)start the Connect worker.
+ *
+ * <p>When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation
+ * classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each
+ * extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context.
+ *
+ * <p>When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
+ * its resources.
+ */
+public interface ConnectRestExtension extends Configurable, Versioned, Closeable {
+
+    /**
+     * ConnectRestExtension implementations can register custom JAX-RS resources via the {@link #register(ConnectRestExtensionContext)}
+     * method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt
+     * to re-register any of the Connect resources, it will be be ignored and will be logged.
+     *
+     * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link
+     *                          ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link
+     *                          ConnectRestExtensionContext#configurable()}
+     */
+    void register(ConnectRestExtensionContext restPluginContext);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
new file mode 100644
index 00000000000..76085971c37
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import javax.ws.rs.core.Configurable;
+
+/**
+ * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS
+ * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided
+ * by the Connect framework.
+ */
+public interface ConnectRestExtensionContext {
+
+    /**
+     * Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources.
+     *
+     * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null}
+     */
+    Configurable<? extends Configurable> configurable();
+
+    /**
+     * Provides the cluster state and health information about the connectors and tasks.
+     *
+     * @return the cluster state information; never {@code null}
+     */
+    ConnectClusterState clusterState();
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
new file mode 100644
index 00000000000..91d5d9ca00c
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
+ * javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the
+ * JVM. An implementation of {@link javax.security.auth.spi.LoginModule} needs to be provided in the JAAS config file. The {@code
+ * LoginModule} implementation should configure the {@link javax.security.auth.callback.CallbackHandler} with only {@link
+ * javax.security.auth.callback.NameCallback} and {@link javax.security.auth.callback.PasswordCallback}.
+ *
+ * <p>To use this extension, one needs to add the following config in the {@code worker.properties}
+ * <pre>
+ *     rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
+ * </pre>
+ *
+ * <p> An example JAAS config would look as below
+ * <Pre>
+ *         KafkaConnect {
+ *              org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
+ *              file="/mnt/secret/credentials.properties";
+ *         };
+ *</Pre>
+ *
+ * <p>This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link
+ * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link
+ * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence
+ * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry
+ * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension}
+ *
+ * <p><b>NOTE: The implementation ships with a default {@link PropertyFileLoginModule} that helps authenticate the request against a
+ * property file. {@link PropertyFileLoginModule} is NOT intended to be used in production since the credentials are stored in PLAINTEXT. One can use
+ * this extension in production by using their own implementation of {@link javax.security.auth.spi.LoginModule} that authenticates against
+ * stores like LDAP, DB, etc.</b>
+ */
+public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
+
+    @Override
+    public void register(ConnectRestExtensionContext restPluginContext) {
+        restPluginContext.configurable().register(JaasBasicAuthFilter.class);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
new file mode 100644
index 00000000000..7231700af7c
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.core.Response;
+
+public class JaasBasicAuthFilter implements ContainerRequestFilter {
+
+    private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
+    static final String AUTHORIZATION = "Authorization";
+
+    @Override
+    public void filter(ContainerRequestContext requestContext) throws IOException {
+
+        try {
+            LoginContext loginContext =
+                new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
+                    requestContext.getHeaderString(AUTHORIZATION)));
+            loginContext.login();
+        } catch (LoginException | ConfigException e) {
+            requestContext.abortWith(
+                Response.status(Response.Status.UNAUTHORIZED)
+                    .entity("User cannot access the resource.")
+                    .build());
+        }
+    }
+
+
+    public static class BasicAuthCallBackHandler implements CallbackHandler {
+
+        private static final String BASIC = "basic";
+        private static final char COLON = ':';
+        private static final char SPACE = ' ';
+        private String username;
+        private String password;
+
+        public BasicAuthCallBackHandler(String credentials) {
+            if (credentials != null) {
+                int space = credentials.indexOf(SPACE);
+                if (space > 0) {
+                    String method = credentials.substring(0, space);
+                    if (BASIC.equalsIgnoreCase(method)) {
+                        credentials = credentials.substring(space + 1);
+                        credentials = new String(Base64.getDecoder().decode(credentials),
+                                                 StandardCharsets.UTF_8);
+                        int i = credentials.indexOf(COLON);
+                        if (i > 0) {
+                            username = credentials.substring(0, i);
+                            password = credentials.substring(i + 1);
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    ((NameCallback) callback).setName(username);
+                } else if (callback instanceof PasswordCallback) {
+                    ((PasswordCallback) callback).setPassword(password.toCharArray());
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Supports only NameCallback "
+                                                                     + "and PasswordCallback");
+                }
+            }
+        }
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
new file mode 100644
index 00000000000..7af7863b2ce
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+/**
+ * {@link PropertyFileLoginModule} authenticates against a properties file.
+ * The credentials should be stored in the format {username}={password} in the properties file.
+ * The absolute path of the file needs to specified using the option <b>file</b>
+ *
+ * <p><b>NOTE: This implementation is NOT intended to be used in production since the credentials are stored in PLAINTEXT in the
+ * properties file.</b>
+ */
+public class PropertyFileLoginModule implements LoginModule {
+    private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
+
+    private CallbackHandler callbackHandler;
+    private static final String FILE_OPTIONS = "file";
+    private String fileName;
+    private boolean authenticated;
+
+    private static Map<String, Properties> credentialPropertiesMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
+        this.callbackHandler = callbackHandler;
+        fileName = (String) options.get(FILE_OPTIONS);
+        if (fileName == null || fileName.trim().isEmpty()) {
+            throw new ConfigException("Property Credentials file must be specified");
+        }
+        if (!credentialPropertiesMap.containsKey(fileName)) {
+            Properties credentialProperties = new Properties();
+            try {
+                credentialProperties.load(Files.newInputStream(Paths.get(fileName)));
+                credentialPropertiesMap.putIfAbsent(fileName, credentialProperties);
+            } catch (IOException e) {
+                log.error("Error loading credentials file ", e);
+                throw new ConfigException("Error loading Property Credentials file");
+            }
+        }
+    }
+
+    @Override
+    public boolean login() throws LoginException {
+        Callback[] callbacks = configureCallbacks();
+        try {
+            callbackHandler.handle(callbacks);
+        } catch (Exception e) {
+            throw new LoginException(e.getMessage());
+        }
+
+        String username = ((NameCallback) callbacks[0]).getName();
+        char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
+        String password = passwordChars != null ? new String(passwordChars) : null;
+        Properties credentialProperties = credentialPropertiesMap.get(fileName);
+        authenticated = credentialProperties.isEmpty() ||
+                        (password != null && password.equals(credentialProperties.get(username)));
+        return authenticated;
+    }
+
+    @Override
+    public boolean commit() throws LoginException {
+        return authenticated;
+    }
+
+    @Override
+    public boolean abort() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean logout() throws LoginException {
+        return true;
+    }
+
+    private Callback[] configureCallbacks() {
+
+        Callback[] callbacks = new Callback[2];
+        callbacks[0] = new NameCallback("Enter user name");
+        callbacks[1] = new PasswordCallback("Enter password", false);
+        return callbacks;
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 00000000000..098c9473d05
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
\ No newline at end of file
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
new file mode 100644
index 00000000000..80299f81c29
--- /dev/null
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import javax.security.auth.login.Configuration;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Response;
+
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.*")
+public class JaasBasicAuthFilterTest {
+
+    @MockStrict
+    private ContainerRequestContext requestContext;
+
+    private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter();
+    private String previousJaasConfig;
+    private Configuration previousConfiguration;
+
+    @Before
+    public void setup() throws IOException {
+        EasyMock.reset(requestContext);
+    }
+
+    @After
+    public void tearDown() {
+        if (previousJaasConfig != null) {
+            System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig);
+        }
+        Configuration.setConfiguration(previousConfiguration);
+    }
+
+    @Test
+    public void testSuccess() throws IOException {
+        File credentialFile = File.createTempFile("credential", ".properties");
+        credentialFile.deleteOnExit();
+        List<String> lines = new ArrayList<>();
+        lines.add("user=password");
+        lines.add("user1=password1");
+        Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
+
+        setupJaasConfig("KafkaConnect", credentialFile.getPath(), true);
+        setMock("Basic", "user", "password", false);
+
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+
+    @Test
+    public void testBadCredential() throws IOException {
+        setMock("Basic", "user1", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testBadPassword() throws IOException {
+        setMock("Basic", "user", "password1", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownBearer() throws IOException {
+        setMock("Unknown", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownLoginModule() throws IOException {
+        setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownCredentialsFile() throws IOException {
+        setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testEmptyCredentialsFile() throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+        setupJaasConfig("KafkaConnect", "", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testNoFileOption() throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+        setupJaasConfig("KafkaConnect", "", false);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    private void setMock(String authorization, String username, String password, boolean exceptionCase) {
+        String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+        EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+            .andReturn(authHeader);
+        if (exceptionCase) {
+            requestContext.abortWith(EasyMock.anyObject(Response.class));
+            EasyMock.expectLastCall();
+        }
+        replayAll();
+    }
+
+    private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+
+        List<String> lines;
+        lines = new ArrayList<>();
+        lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
+        if (includeFileOptions) {
+            lines.add("file=\"" + credentialFilePath + "\"");
+        }
+        lines.add(";};");
+        Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
+        previousConfiguration = Configuration.getConfiguration();
+        Configuration.setConfiguration(null);
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c9c32e7f066..3c76d0fa5f2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -190,6 +190,13 @@
             + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
             + "/opt/connectors";
 
+    public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
+    protected static final String REST_EXTENSION_CLASSES_DOC =
+            "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+            + "in the order specified. Implementing the interface  "
+            + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources  like filters. "
+            + "Typically used to add custom capability like logging, security, etc.";
+
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
     public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@@ -254,7 +261,9 @@ protected static ConfigDef baseConfigDef() {
                         ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
-                        Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
+                        Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+                .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
new file mode 100644
index 00000000000..a0f7fdeea9f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.health.ConnectorHealth;
+import org.apache.kafka.connect.health.ConnectorState;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.health.TaskState;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.Callback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConnectClusterStateImpl implements ConnectClusterState {
+
+    private Herder herder;
+
+    public ConnectClusterStateImpl(Herder herder) {
+        this.herder = herder;
+    }
+
+    @Override
+    public Collection<String> connectors() {
+        final Collection<String> connectors = new ArrayList<>();
+        herder.connectors(new Callback<java.util.Collection<String>>() {
+            @Override
+            public void onCompletion(Throwable error, Collection<String> result) {
+                connectors.addAll(result);
+            }
+        });
+        return connectors;
+    }
+
+    @Override
+    public ConnectorHealth connectorHealth(String connName) {
+
+        ConnectorStateInfo state = herder.connectorStatus(connName);
+        ConnectorState connectorState = new ConnectorState(
+            state.connector().state(),
+            state.connector().workerId(),
+            state.connector().trace()
+        );
+        Map<Integer, TaskState> taskStates = taskStates(state.tasks());
+        ConnectorHealth connectorHealth = new ConnectorHealth(
+            connName,
+            connectorState,
+            taskStates,
+            ConnectorType.valueOf(state.type().name())
+        );
+        return connectorHealth;
+    }
+
+    private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
+
+        Map<Integer, TaskState> taskStates = new HashMap<>();
+
+        for (ConnectorStateInfo.TaskState state : states) {
+            taskStates.put(
+                state.id(),
+                new TaskState(state.id(), state.workerId(), state.state(), state.trace())
+            );
+        }
+        return taskStates;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index c67dfb5f2de..b56bd1a7d91 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -64,6 +66,7 @@
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
+    private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
 
@@ -77,6 +80,7 @@ public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
         this.converters = new TreeSet<>();
         this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
+        this.restExtensions = new TreeSet<>();
     }
 
     public DelegatingClassLoader(List<String> pluginPaths) {
@@ -99,6 +103,10 @@ public DelegatingClassLoader(List<String> pluginPaths) {
         return transformations;
     }
 
+    public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
+        return restExtensions;
+    }
+
     public ClassLoader connectorLoader(Connector connector) {
         return connectorLoader(connector.getClass().getName());
     }
@@ -228,6 +236,8 @@ private void scanUrlsAndAddPlugins(
             headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
+            addPlugins(plugins.restExtensions(), loader);
+            restExtensions.addAll(plugins.restExtensions());
         }
 
         loadJdbcDrivers(loader);
@@ -281,7 +291,8 @@ private PluginScanResult scanPluginPath(
                 getPluginDesc(reflections, Connector.class, loader),
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
-                getPluginDesc(reflections, Transformation.class, loader)
+                getPluginDesc(reflections, Transformation.class, loader),
+                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
         );
     }
 
@@ -295,23 +306,29 @@ private PluginScanResult scanPluginPath(
         Collection<PluginDesc<T>> result = new ArrayList<>();
         for (Class<? extends T> plugin : plugins) {
             if (PluginUtils.isConcrete(plugin)) {
-                // Temporary workaround until all the plugins are versioned.
-                if (Connector.class.isAssignableFrom(plugin)) {
-                    result.add(
-                            new PluginDesc<>(
-                                    plugin,
-                                    ((Connector) plugin.newInstance()).version(),
-                                    loader
-                            )
-                    );
-                } else {
-                    result.add(new PluginDesc<>(plugin, "undefined", loader));
-                }
+                result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader));
+            } else {
+                log.debug("Skipping {} as it is not concrete implementation", plugin);
             }
         }
         return result;
     }
 
+    private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
+                                                                     ClassLoader loader) {
+
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        Collection<PluginDesc<T>> result = new ArrayList<>();
+        for (T impl : serviceLoader) {
+            result.add(new PluginDesc<>(klass, versionFor(impl), loader));
+        }
+        return result;
+    }
+
+    private static <T>  String versionFor(T pluginImpl) {
+        return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined";
+    }
+
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
@@ -337,6 +354,7 @@ private void addAllAliases() {
         addAliases(converters);
         addAliases(headerConverters);
         addAliases(transformations);
+        addAliases(restExtensions);
     }
 
     private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index c680f088b6e..6f48e5694bd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -28,17 +29,20 @@
     private final Collection<PluginDesc<Converter>> converters;
     private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
+    private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
             Collection<PluginDesc<Converter>> converters,
             Collection<PluginDesc<HeaderConverter>> headerConverters,
-            Collection<PluginDesc<Transformation>> transformations
+            Collection<PluginDesc<Transformation>> transformations,
+            Collection<PluginDesc<ConnectRestExtension>> restExtensions
     ) {
         this.connectors = connectors;
         this.converters = converters;
         this.headerConverters = headerConverters;
         this.transformations = transformations;
+        this.restExtensions = restExtensions;
     }
 
     public Collection<PluginDesc<Connector>> connectors() {
@@ -57,10 +61,15 @@ public PluginScanResult(
         return transformations;
     }
 
+    public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
+        return restExtensions;
+    }
+
     public boolean isEmpty() {
         return connectors().isEmpty()
                && converters().isEmpty()
                && headerConverters().isEmpty()
-               && transformations().isEmpty();
+               && transformations().isEmpty()
+               && restExtensions().isEmpty();
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 5649213a2f3..918f9d73f56 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
@@ -30,6 +31,7 @@
     CONNECTOR(Connector.class),
     CONVERTER(Converter.class),
     TRANSFORMATION(Transformation.class),
+    REST_EXTENSION(ConnectRestExtension.class),
     UNKNOWN(Object.class);
 
     private Class<?> klass;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 30c41cd7d17..96074106de0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
@@ -316,6 +318,53 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String classPro
         return plugin;
     }
 
+
+    /**
+     * If the given class names are available in the classloader, return a list of new configured
+     * instances. If the instances implement {@link Configurable}, they are configured with provided {@param config}
+     *
+     * @param klassNames         the list of class names of plugins that needs to instantiated and configured
+     * @param config             the configuration containing the {@link org.apache.kafka.connect.runtime.Worker}'s configuration; may not be {@code null}
+     * @param pluginKlass        the type of the plugin class that is being instantiated
+     * @return the instantiated and configured list of plugins of type <T>; empty list if the {@param klassNames} is {@code null} or empty
+     * @throws ConnectException if the implementation class could not be found
+     */
+    public <T> List<T> newPlugins(List<String> klassNames, AbstractConfig config, Class<T> pluginKlass) {
+        List<T> plugins = new ArrayList<>();
+        if (klassNames != null) {
+            for (String klassName : klassNames) {
+                plugins.add(newPlugin(klassName, config, pluginKlass));
+            }
+        }
+        return plugins;
+    }
+
+    public <T> T newPlugin(String klassName, AbstractConfig config, Class<T> pluginKlass) {
+        T plugin;
+        Class<? extends T> klass;
+        try {
+            klass = pluginClass(delegatingLoader, klassName, pluginKlass);
+        } catch (ClassNotFoundException e) {
+            String msg = String.format("Failed to find any class that implements %s and which "
+                                       + "name matches %s", pluginKlass, klassName);
+            throw new ConnectException(msg);
+        }
+        plugin = newPlugin(klass);
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate '" + klassName + "'");
+        }
+        if (plugin instanceof Versioned) {
+            Versioned versionedPlugin = (Versioned) plugin;
+            if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) {
+                throw new ConnectException("Version not defined for '" + klassName + "'");
+            }
+        }
+        if (plugin instanceof Configurable) {
+            ((Configurable) plugin).configure(config.originals());
+        }
+        return plugin;
+    }
+
     /**
      * Get an instance of the give class specified by the given configuration key.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
new file mode 100644
index 00000000000..c9c2c3bbd82
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+import javax.ws.rs.core.Configurable;
+import javax.ws.rs.core.Configuration;
+
+/**
+ * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate
+ * registrations deterministically by not re-registering them again.
+ */
+public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(ConnectRestConfigurable.class);
+
+    private static final boolean ALLOWED_TO_REGISTER = true;
+    private static final boolean NOT_ALLOWED_TO_REGISTER = false;
+
+    private ResourceConfig resourceConfig;
+
+    public ConnectRestConfigurable(ResourceConfig resourceConfig) {
+        Objects.requireNonNull(resourceConfig, "ResourceConfig can't be null");
+        this.resourceConfig = resourceConfig;
+    }
+
+
+    @Override
+    public Configuration getConfiguration() {
+        return resourceConfig.getConfiguration();
+    }
+
+    @Override
+    public ResourceConfig property(String name, Object value) {
+        return resourceConfig.property(name, value);
+    }
+
+    @Override
+    public ResourceConfig register(Object component) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, int priority) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, priority);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, Map contracts) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, Class[] contracts) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, Map contracts) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, Class[] contracts) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, int priority) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, priority);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass);
+        }
+        return resourceConfig;
+    }
+
+    private boolean allowedToRegister(Object component) {
+        if (resourceConfig.isRegistered(component)) {
+            log.warn("The resource {} is already registered", component);
+            return NOT_ALLOWED_TO_REGISTER;
+        }
+        return ALLOWED_TO_REGISTER;
+    }
+
+    private boolean allowedToRegister(Class componentClass) {
+        if (resourceConfig.isRegistered(componentClass)) {
+            log.warn("The resource {} is already registered", componentClass);
+            return NOT_ALLOWED_TO_REGISTER;
+        }
+        return ALLOWED_TO_REGISTER;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
new file mode 100644
index 00000000000..cdf282fc907
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import javax.ws.rs.core.Configurable;
+
+public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext {
+
+    private Configurable<? extends Configurable> configurable;
+    private ConnectClusterState clusterState;
+
+    public ConnectRestExtensionContextImpl(
+        Configurable<? extends Configurable> configurable,
+        ConnectClusterState clusterState
+    ) {
+        this.configurable = configurable;
+        this.clusterState = clusterState;
+    }
+
+    @Override
+    public Configurable<? extends Configurable> configurable() {
+        return configurable;
+    }
+
+    @Override
+    public ConnectClusterState clusterState() {
+        return clusterState;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 8d7803edb39..5a589db8858 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,10 +17,14 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -45,8 +49,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,6 +59,9 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
+
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
@@ -71,6 +77,8 @@
     private final WorkerConfig config;
     private Server jettyServer;
 
+    private List<ConnectRestExtension> connectRestExtensions = Collections.EMPTY_LIST;
+
     /**
      * Create a REST server for this herder using the specified configs.
      */
@@ -163,6 +171,8 @@ public void start(Herder herder) {
 
         resourceConfig.register(ConnectExceptionMapper.class);
 
+        registerRestExtensions(herder, resourceConfig);
+
         ServletContainer servletContainer = new ServletContainer(resourceConfig);
         ServletHolder servletHolder = new ServletHolder(servletContainer);
 
@@ -207,10 +217,19 @@ public void start(Herder herder) {
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
     }
 
+
+
     public void stop() {
         log.info("Stopping REST server");
 
         try {
+            for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+                try {
+                    connectRestExtension.close();
+                } catch (IOException e) {
+                    log.warn("Error while invoking close on " + connectRestExtension.getClass(), e);
+                }
+            }
             jettyServer.stop();
             jettyServer.join();
         } catch (Exception e) {
@@ -280,6 +299,22 @@ ServerConnector findConnector(String protocol) {
         return null;
     }
 
+    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+        connectRestExtensions = herder.plugins().newPlugins(
+            config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+            config, ConnectRestExtension.class);
+
+        ConnectRestExtensionContext connectRestExtensionContext =
+            new ConnectRestExtensionContextImpl(
+                new ConnectRestConfigurable(resourceConfig),
+                new ConnectClusterStateImpl(herder)
+            );
+        for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+            connectRestExtension.register(connectRestExtensionContext);
+        }
+
+    }
+
     public static String urlJoin(String base, String path) {
         if (base.endsWith("/") && path.startsWith("/"))
             return base + path.substring(1);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 25c2cb10352..ca26e4eeaba 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -71,7 +71,8 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.kafka.connect.runtime.isolation.*"})
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
     private static final String TOPIC = "topic";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 877fe6b600f..5c8aa29f338 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -24,6 +24,8 @@
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.storage.Converter;
@@ -37,6 +39,7 @@
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -141,6 +144,26 @@ public void shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrent
         assertEquals("baz", this.headerConverter.configs.get("extra.config"));
     }
 
+    @Test
+    public void shouldInstantiateAndConfigureConnectRestExtension() {
+        props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
+                  TestConnectRestExtension.class.getName());
+        createConfig();
+
+        List<ConnectRestExtension> connectRestExtensions =
+            plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+                               config,
+                               ConnectRestExtension.class);
+        assertNotNull(connectRestExtensions);
+        assertEquals("One Rest Extension expected", 1, connectRestExtensions.size());
+        assertNotNull(connectRestExtensions.get(0));
+        assertTrue("Should be instance of TestConnectRestExtension",
+                   connectRestExtensions.get(0) instanceof TestConnectRestExtension);
+        assertNotNull(((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+        assertEquals(config.originals(),
+                     ((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+    }
+
     @Test
     public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
         props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
@@ -243,6 +266,30 @@ public void close() throws IOException {
         }
     }
 
+
+    public static class TestConnectRestExtension implements ConnectRestExtension {
+
+        public Map<String, ?> configs;
+
+        @Override
+        public void register(ConnectRestExtensionContext restPluginContext) {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+        }
+
+        @Override
+        public String version() {
+            return "test";
+        }
+    }
+
     public static class TestInternalConverter extends JsonConverter {
         public Map<String, ?> configs;
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index d26aa04149f..2f8704ae4e8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -32,18 +34,20 @@
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
 import static org.junit.Assert.assertEquals;
 
 @RunWith(PowerMockRunner.class)
@@ -51,6 +55,8 @@
 
     @MockStrict
     private Herder herder;
+    @MockStrict
+    private Plugins plugins;
     private RestServer server;
 
     @After
@@ -151,8 +157,19 @@ public void testAdvertisedUri() {
 
     public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
         // To be able to set the Origin, we need to toggle this flag
+
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
         System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
 
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
+                                           workerConfig,
+                                           ConnectRestExtension.class))
+            .andStubReturn(Collections.EMPTY_LIST);
+
         final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
         herder.connectors(EasyMock.capture(connectorsCallback));
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@@ -165,10 +182,7 @@ public Object answer() throws Throwable {
 
         PowerMock.replayAll();
 
-        Map<String, String> workerProps = baseWorkerProps();
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
-        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
         server = new RestServer(workerConfig);
         server.start(herder);
 
diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 00000000000..0a1ef88924e
--- /dev/null
+++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConnectRestExtension
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 7082ddd9e05..bbcdc3174b8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scal
         'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
         'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
         'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
-        'jmh-benchmarks'
+        'connect:basic-auth-extension', 'jmh-benchmarks'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Connect Rest Extension Plugin
> -----------------------------
>
>                 Key: KAFKA-6776
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6776
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Magesh kumar Nandakumar
>            Assignee: Magesh kumar Nandakumar
>            Priority: Major
>             Fix For: 2.0.0
>
>
> This covers the connect Rest extension plugin covered at KIP-285  https://cwiki.apache.org/confluence/display/KAFKA/KIP+285+-+Connect+Rest+Extension+Plugin



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)