You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/30 04:36:10 UTC

[kafka] branch trunk updated: KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9809495  KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)
9809495 is described below

commit 98094954a27849b027831a7401e965c6a949790c
Author: Magesh Nandakumar <ma...@gmail.com>
AuthorDate: Tue May 29 21:35:22 2018 -0700

    KAFKA-6776: ConnectRestExtension Interfaces & Rest integration (KIP-285)
    
    This PR provides the implementation for KIP-285 and also a reference implementation for authenticating BasicAuth credentials using JAAS LoginModule
    
    Author: Magesh Nandakumar <ma...@gmail.com>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Arjun Satish <wi...@users.noreply.github.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4931 from mageshn/KIP-285
---
 build.gradle                                       |  40 +++++
 checkstyle/import-control.xml                      |  14 +-
 .../apache/kafka/connect/components/Versioned.java |  30 ++++
 .../apache/kafka/connect/connector/Connector.java  |   9 +-
 .../apache/kafka/connect/health/AbstractState.java |  74 +++++++++
 .../kafka/connect/health/ConnectClusterState.java  |  46 ++++++
 .../kafka/connect/health/ConnectorHealth.java      |  86 +++++++++++
 .../kafka/connect/health/ConnectorState.java       |  35 +++++
 .../apache/kafka/connect/health/ConnectorType.java |  43 ++++++
 .../org/apache/kafka/connect/health/TaskState.java |  69 +++++++++
 .../kafka/connect/rest/ConnectRestExtension.java   |  58 +++++++
 .../connect/rest/ConnectRestExtensionContext.java  |  44 ++++++
 .../extenstion/BasicAuthSecurityRestExtension.java |  79 ++++++++++
 .../basic/auth/extenstion/JaasBasicAuthFilter.java | 100 ++++++++++++
 .../auth/extenstion/PropertyFileLoginModule.java   | 116 ++++++++++++++
 ....apache.kafka.connect.rest.ConnectRestExtension |  16 ++
 .../auth/extenstion/JaasBasicAuthFilterTest.java   | 168 +++++++++++++++++++++
 .../apache/kafka/connect/runtime/WorkerConfig.java |  11 +-
 .../runtime/health/ConnectClusterStateImpl.java    |  86 +++++++++++
 .../runtime/isolation/DelegatingClassLoader.java   |  44 ++++--
 .../runtime/isolation/PluginScanResult.java        |  13 +-
 .../connect/runtime/isolation/PluginType.java      |   2 +
 .../kafka/connect/runtime/isolation/Plugins.java   |  49 ++++++
 .../runtime/rest/ConnectRestConfigurable.java      | 138 +++++++++++++++++
 .../rest/ConnectRestExtensionContextImpl.java      |  47 ++++++
 .../kafka/connect/runtime/rest/RestServer.java     |  39 ++++-
 .../connect/runtime/WorkerSourceTaskTest.java      |   3 +-
 .../connect/runtime/isolation/PluginsTest.java     |  47 ++++++
 .../kafka/connect/runtime/rest/RestServerTest.java |  32 ++--
 ....apache.kafka.connect.rest.ConnectRestExtension |  16 ++
 settings.gradle                                    |   2 +-
 31 files changed, 1519 insertions(+), 37 deletions(-)

diff --git a/build.gradle b/build.gradle
index 474329a..4f3fd77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1207,6 +1207,7 @@ project(':connect:api') {
   dependencies {
     compile project(':clients')
     compile libs.slf4jApi
+    compile libs.jerseyContainerServlet
 
     testCompile libs.junit
 
@@ -1431,6 +1432,45 @@ project(':connect:file') {
   }
 }
 
+project(':connect:basic-auth-extension') {
+  archivesBaseName = "connect-basic-auth-extension"
+
+  dependencies {
+    compile project(':connect:api')
+    compile libs.slf4jApi
+
+    testCompile libs.bcpkix
+    testCompile libs.easymock
+    testCompile libs.junit
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
+    testCompile project(':clients').sourceSets.test.output
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+      include('log4j*jar')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e4f9a4e..5549205 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,7 @@
     <allow pkg="org.apache.kafka.connect.data" />
     <allow pkg="org.apache.kafka.connect.errors" />
     <allow pkg="org.apache.kafka.connect.header" />
+    <allow pkg="org.apache.kafka.connect.components"/>
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.test"/>
 
@@ -307,7 +308,16 @@
     <subpackage name="converters">
       <allow pkg="org.apache.kafka.connect.storage" />
     </subpackage>
-    
+
+    <subpackage name="rest">
+      <allow pkg="org.apache.kafka.connect.health" />
+      <allow pkg="javax.ws.rs" />
+      <allow pkg= "javax.security.auth"/>
+      <subpackage name="basic">
+        <allow pkg="org.apache.kafka.connect.rest"/>
+      </subpackage>
+    </subpackage>
+
     <subpackage name="runtime">
       <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.reflections"/>
@@ -327,6 +337,8 @@
       </subpackage>
     </subpackage>
 
+
+
     <subpackage name="cli">
       <allow pkg="org.apache.kafka.connect.runtime" />
       <allow pkg="org.apache.kafka.connect.storage" />
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
new file mode 100644
index 0000000..adabe8f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/components/Versioned.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.components;
+
+/**
+ * Connect requires some components implement this interface to define a version string.
+ */
+public interface Versioned {
+    /**
+     * Get the version of this component.
+     *
+     * @return the version, formatted as a String. The version may not be (@code null} or empty.
+     */
+    String version();
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 30dfd3c..329429b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.components.Versioned;
 
 import java.util.List;
 import java.util.Map;
@@ -41,16 +42,10 @@ import java.util.Map;
  * Tasks.
  * </p>
  */
-public abstract class Connector {
+public abstract class Connector implements Versioned {
 
     protected ConnectorContext context;
 
-    /**
-     * Get the version of this connector.
-     *
-     * @return the version, formatted as a String
-     */
-    public abstract String version();
 
     /**
      * Initialize this connector, using the provided ConnectorContext to notify the runtime of
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
new file mode 100644
index 0000000..a18c463
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/AbstractState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Provides the current status along with identifier for Connect worker and tasks.
+ */
+public abstract class AbstractState {
+
+    private final String state;
+    private final String traceMessage;
+    private final String workerId;
+
+    /**
+     * Construct a state for connector or task.
+     *
+     * @param state  the status of connector or task; may not be null or empty
+     * @param workerId  the workerId associated with the connector or the task; may not be null or empty
+     * @param traceMessage  any error trace message associated with the connector or the task; may be null or empty
+     */
+    public AbstractState(String state, String workerId, String traceMessage) {
+        if (state != null && !state.trim().isEmpty()) {
+            throw new IllegalArgumentException("State must not be null or empty");
+        }
+        if (workerId != null && !workerId.trim().isEmpty()) {
+            throw new IllegalArgumentException("Worker ID must not be null or empty");
+        }
+        this.state = state;
+        this.workerId = workerId;
+        this.traceMessage = traceMessage;
+    }
+
+    /**
+     * Provides the current state of the connector or task.
+     *
+     * @return state, never {@code null} or empty
+     */
+    public String state() {
+        return state;
+    }
+
+    /**
+     * The identifier of the worker associated with the connector or the task.
+     *
+     * @return workerId, never {@code null} or empty.
+     */
+    public String workerId() {
+        return workerId;
+    }
+
+    /**
+     * The error message associated with the connector or task.
+     *
+     * @return traceMessage, can be {@code null} or empty.
+     */
+    public String traceMessage() {
+        return traceMessage;
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
new file mode 100644
index 0000000..d4292ef
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Collection;
+
+/**
+ * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
+ * implementations. The Connect framework provides the implementation for this interface.
+ */
+public interface ConnectClusterState {
+
+    /**
+     * Get the names of the connectors currently deployed in this cluster. This is a full list of connectors in the cluster gathered from
+     * the current configuration, which may change over time.
+     *
+     * @return collection of connector names, never {@code null}
+     */
+    Collection<String> connectors();
+
+    /**
+     * Lookup the current health of a connector and its tasks. This provides the current snapshot of health by querying the underlying
+     * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
+     * org.apache.kafka.connect.errors.NotFoundException}.
+     *
+     * @param connName name of the connector
+     * @return the health of the connector for the connector name
+     * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
+     */
+    ConnectorHealth connectorHealth(String connName);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
new file mode 100644
index 0000000..3a9efd1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorHealth.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Provides basic health information about the connector and its tasks.
+ */
+public class ConnectorHealth {
+
+    private final String name;
+    private final ConnectorState connectorState;
+    private final Map<Integer, TaskState> tasks;
+    private final ConnectorType type;
+
+
+    public ConnectorHealth(String name,
+                           ConnectorState connectorState,
+                           Map<Integer, TaskState> tasks,
+                           ConnectorType type) {
+        if (name != null && !name.trim().isEmpty()) {
+            throw new IllegalArgumentException("Connector name is required");
+        }
+        Objects.requireNonNull(connectorState, "connectorState can't be null");
+        Objects.requireNonNull(tasks, "tasks can't be null");
+        Objects.requireNonNull(type, "type can't be null");
+        this.name = name;
+        this.connectorState = connectorState;
+        this.tasks = tasks;
+        this.type = type;
+    }
+
+    /**
+     * Provides the name of the connector.
+     *
+     * @return name, never {@code null} or empty
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Provides the current state of the connector.
+     *
+     * @return the connector state, never {@code null}
+     */
+    public ConnectorState connectorState() {
+        return connectorState;
+    }
+
+    /**
+     * Provides the current state of the connector tasks.
+     *
+     * @return the state for each task ID; never {@code null}
+     */
+    public Map<Integer, TaskState> tasksState() {
+        return tasks;
+    }
+
+    /**
+     * Provides the type of the connector.
+     *
+     * @return type, never {@code null}
+     */
+    public ConnectorType type() {
+        return type;
+    }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
new file mode 100644
index 0000000..d5571bc
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+/**
+ * Describes the status, worker ID, and any errors associated with a connector.
+ */
+public class ConnectorState extends AbstractState {
+
+    /**
+     * Provides an instance of the ConnectorState.
+     *
+     * @param state - the status of connector, may not be {@code null} or empty
+     * @param workerId - the workerId associated with the connector, may not be {@code null} or empty
+     * @param traceMessage - any error message associated with the connector, may be {@code null} or empty
+     */
+    public ConnectorState(String state, String workerId, String traceMessage) {
+        super(state, workerId, traceMessage);
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
new file mode 100644
index 0000000..fa9db6f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectorType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.health;
+
+import java.util.Locale;
+
+/**
+ * Enum definition that identifies the type of the connector.
+ */
+public enum ConnectorType {
+    /**
+     * Identifies a source connector
+     */
+    SOURCE,
+    /**
+     * Identifies a sink connector
+     */
+    SINK,
+    /**
+     * Identifies a connector whose type could not be inferred
+     */
+    UNKNOWN;
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);
+    }
+
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
new file mode 100644
index 0000000..1c1be15
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/health/TaskState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.health;
+
+import java.util.Objects;
+
+/**
+ * Describes the state, IDs, and any errors of a connector task.
+ */
+public class TaskState extends AbstractState {
+
+    private final int taskId;
+
+    /**
+     * Provides an instance of {@link TaskState}.
+     *
+     * @param taskId   the id associated with the connector task
+     * @param state    the status of the task, may not be {@code null} or empty
+     * @param workerId id of the worker the task is associated with, may not be {@code null} or empty
+     * @param trace    error message if that task had failed or errored out, may be {@code null} or empty
+     */
+    public TaskState(int taskId, String state, String workerId, String trace) {
+        super(state, workerId, trace);
+        this.taskId = taskId;
+    }
+
+    /**
+     * Provides the ID of the task.
+     *
+     * @return the task ID
+     */
+    public int taskId() {
+        return taskId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TaskState taskState = (TaskState) o;
+
+        return taskId == taskState.taskId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(taskId);
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
new file mode 100644
index 0000000..aa479a3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * A plugin interface to allow registration of new JAX-RS resources like Filters, REST endpoints, providers, etc. The implementations will
+ * be discovered using the standard Java {@link java.util.ServiceLoader} mechanism by  Connect's plugin class loading mechanism.
+ *
+ * <p>The extension class(es) must be packaged as a plugin, with one JAR containing the implementation classes and a {@code
+ * META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} file that contains the fully qualified name of the
+ * class(es) that implement the ConnectRestExtension interface. The plugin should also include the JARs of all dependencies except those
+ * already provided by the Connect framework.
+ *
+ * <p>To install into a Connect installation, add a directory named for the plugin and containing the plugin's JARs into a directory that is
+ * on Connect's {@code plugin.path}, and (re)start the Connect worker.
+ *
+ * <p>When the Connect worker process starts up, it will read its configuration and instantiate all of the REST extension implementation
+ * classes that are specified in the `rest.extension.classes` configuration property. Connect will then pass its configuration to each
+ * extension via the {@link Configurable#configure(Map)} method, and will then call {@link #register} with a provided context.
+ *
+ * <p>When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
+ * its resources.
+ */
+public interface ConnectRestExtension extends Configurable, Versioned, Closeable {
+
+    /**
+     * ConnectRestExtension implementations can register custom JAX-RS resources via the {@link #register(ConnectRestExtensionContext)}
+     * method. The Connect framework will invoke this method after registering the default Connect resources. If the implementations attempt
+     * to re-register any of the Connect resources, it will be be ignored and will be logged.
+     *
+     * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link
+     *                          ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link
+     *                          ConnectRestExtensionContext#configurable()}
+     */
+    void register(ConnectRestExtensionContext restPluginContext);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
new file mode 100644
index 0000000..7608597
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+
+import javax.ws.rs.core.Configurable;
+
+/**
+ * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS
+ * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided
+ * by the Connect framework.
+ */
+public interface ConnectRestExtensionContext {
+
+    /**
+     * Provides an implementation of {@link javax.ws.rs.core.Configurable} that be used to register JAX-RS resources.
+     *
+     * @return @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null}
+     */
+    Configurable<? extends Configurable> configurable();
+
+    /**
+     * Provides the cluster state and health information about the connectors and tasks.
+     *
+     * @return the cluster state information; never {@code null}
+     */
+    ConnectClusterState clusterState();
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
new file mode 100644
index 0000000..91d5d9c
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
+ * javax.security.auth.spi.LoginModule}. An entry with the name {@code KafkaConnect} is expected in the JAAS config file configured in the
+ * JVM. An implementation of {@link javax.security.auth.spi.LoginModule} needs to be provided in the JAAS config file. The {@code
+ * LoginModule} implementation should configure the {@link javax.security.auth.callback.CallbackHandler} with only {@link
+ * javax.security.auth.callback.NameCallback} and {@link javax.security.auth.callback.PasswordCallback}.
+ *
+ * <p>To use this extension, one needs to add the following config in the {@code worker.properties}
+ * <pre>
+ *     rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
+ * </pre>
+ *
+ * <p> An example JAAS config would look as below
+ * <Pre>
+ *         KafkaConnect {
+ *              org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required
+ *              file="/mnt/secret/credentials.properties";
+ *         };
+ *</Pre>
+ *
+ * <p>This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link
+ * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link
+ * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence
+ * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry
+ * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension}
+ *
+ * <p><b>NOTE: The implementation ships with a default {@link PropertyFileLoginModule} that helps authenticate the request against a
+ * property file. {@link PropertyFileLoginModule} is NOT intended to be used in production since the credentials are stored in PLAINTEXT. One can use
+ * this extension in production by using their own implementation of {@link javax.security.auth.spi.LoginModule} that authenticates against
+ * stores like LDAP, DB, etc.</b>
+ */
+public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
+
+    @Override
+    public void register(ConnectRestExtensionContext restPluginContext) {
+        restPluginContext.configurable().register(JaasBasicAuthFilter.class);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
new file mode 100644
index 0000000..7231700
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.container.ContainerRequestFilter;
+import javax.ws.rs.core.Response;
+
+public class JaasBasicAuthFilter implements ContainerRequestFilter {
+
+    private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
+    static final String AUTHORIZATION = "Authorization";
+
+    @Override
+    public void filter(ContainerRequestContext requestContext) throws IOException {
+
+        try {
+            LoginContext loginContext =
+                new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
+                    requestContext.getHeaderString(AUTHORIZATION)));
+            loginContext.login();
+        } catch (LoginException | ConfigException e) {
+            requestContext.abortWith(
+                Response.status(Response.Status.UNAUTHORIZED)
+                    .entity("User cannot access the resource.")
+                    .build());
+        }
+    }
+
+
+    public static class BasicAuthCallBackHandler implements CallbackHandler {
+
+        private static final String BASIC = "basic";
+        private static final char COLON = ':';
+        private static final char SPACE = ' ';
+        private String username;
+        private String password;
+
+        public BasicAuthCallBackHandler(String credentials) {
+            if (credentials != null) {
+                int space = credentials.indexOf(SPACE);
+                if (space > 0) {
+                    String method = credentials.substring(0, space);
+                    if (BASIC.equalsIgnoreCase(method)) {
+                        credentials = credentials.substring(space + 1);
+                        credentials = new String(Base64.getDecoder().decode(credentials),
+                                                 StandardCharsets.UTF_8);
+                        int i = credentials.indexOf(COLON);
+                        if (i > 0) {
+                            username = credentials.substring(0, i);
+                            password = credentials.substring(i + 1);
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                if (callback instanceof NameCallback) {
+                    ((NameCallback) callback).setName(username);
+                } else if (callback instanceof PasswordCallback) {
+                    ((PasswordCallback) callback).setPassword(password.toCharArray());
+                } else {
+                    throw new UnsupportedCallbackException(callback, "Supports only NameCallback "
+                                                                     + "and PasswordCallback");
+                }
+            }
+        }
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
new file mode 100644
index 0000000..7af7863
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+/**
+ * {@link PropertyFileLoginModule} authenticates against a properties file.
+ * The credentials should be stored in the format {username}={password} in the properties file.
+ * The absolute path of the file needs to specified using the option <b>file</b>
+ *
+ * <p><b>NOTE: This implementation is NOT intended to be used in production since the credentials are stored in PLAINTEXT in the
+ * properties file.</b>
+ */
+public class PropertyFileLoginModule implements LoginModule {
+    private static final Logger log = LoggerFactory.getLogger(PropertyFileLoginModule.class);
+
+    private CallbackHandler callbackHandler;
+    private static final String FILE_OPTIONS = "file";
+    private String fileName;
+    private boolean authenticated;
+
+    private static Map<String, Properties> credentialPropertiesMap = new ConcurrentHashMap<>();
+
+    @Override
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
+        this.callbackHandler = callbackHandler;
+        fileName = (String) options.get(FILE_OPTIONS);
+        if (fileName == null || fileName.trim().isEmpty()) {
+            throw new ConfigException("Property Credentials file must be specified");
+        }
+        if (!credentialPropertiesMap.containsKey(fileName)) {
+            Properties credentialProperties = new Properties();
+            try {
+                credentialProperties.load(Files.newInputStream(Paths.get(fileName)));
+                credentialPropertiesMap.putIfAbsent(fileName, credentialProperties);
+            } catch (IOException e) {
+                log.error("Error loading credentials file ", e);
+                throw new ConfigException("Error loading Property Credentials file");
+            }
+        }
+    }
+
+    @Override
+    public boolean login() throws LoginException {
+        Callback[] callbacks = configureCallbacks();
+        try {
+            callbackHandler.handle(callbacks);
+        } catch (Exception e) {
+            throw new LoginException(e.getMessage());
+        }
+
+        String username = ((NameCallback) callbacks[0]).getName();
+        char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
+        String password = passwordChars != null ? new String(passwordChars) : null;
+        Properties credentialProperties = credentialPropertiesMap.get(fileName);
+        authenticated = credentialProperties.isEmpty() ||
+                        (password != null && password.equals(credentialProperties.get(username)));
+        return authenticated;
+    }
+
+    @Override
+    public boolean commit() throws LoginException {
+        return authenticated;
+    }
+
+    @Override
+    public boolean abort() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean logout() throws LoginException {
+        return true;
+    }
+
+    private Callback[] configureCallbacks() {
+
+        Callback[] callbacks = new Callback[2];
+        callbacks[0] = new NameCallback("Enter user name");
+        callbacks[1] = new PasswordCallback("Enter password", false);
+        return callbacks;
+    }
+}
diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 0000000..098c947
--- /dev/null
+++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension
\ No newline at end of file
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
new file mode 100644
index 0000000..80299f8
--- /dev/null
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.rest.basic.auth.extenstion;
+
+import org.apache.kafka.common.security.JaasUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import javax.security.auth.login.Configuration;
+import javax.ws.rs.container.ContainerRequestContext;
+import javax.ws.rs.core.Response;
+
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.*")
+public class JaasBasicAuthFilterTest {
+
+    @MockStrict
+    private ContainerRequestContext requestContext;
+
+    private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter();
+    private String previousJaasConfig;
+    private Configuration previousConfiguration;
+
+    @Before
+    public void setup() throws IOException {
+        EasyMock.reset(requestContext);
+    }
+
+    @After
+    public void tearDown() {
+        if (previousJaasConfig != null) {
+            System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig);
+        }
+        Configuration.setConfiguration(previousConfiguration);
+    }
+
+    @Test
+    public void testSuccess() throws IOException {
+        File credentialFile = File.createTempFile("credential", ".properties");
+        credentialFile.deleteOnExit();
+        List<String> lines = new ArrayList<>();
+        lines.add("user=password");
+        lines.add("user1=password1");
+        Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
+
+        setupJaasConfig("KafkaConnect", credentialFile.getPath(), true);
+        setMock("Basic", "user", "password", false);
+
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+
+    @Test
+    public void testBadCredential() throws IOException {
+        setMock("Basic", "user1", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testBadPassword() throws IOException {
+        setMock("Basic", "user", "password1", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownBearer() throws IOException {
+        setMock("Unknown", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownLoginModule() throws IOException {
+        setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testUnknownCredentialsFile() throws IOException {
+        setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testEmptyCredentialsFile() throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+        setupJaasConfig("KafkaConnect", "", true);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    @Test
+    public void testNoFileOption() throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+        setupJaasConfig("KafkaConnect", "", false);
+        Configuration.setConfiguration(null);
+        setMock("Basic", "user", "password", true);
+        jaasBasicAuthFilter.filter(requestContext);
+    }
+
+    private void setMock(String authorization, String username, String password, boolean exceptionCase) {
+        String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
+        EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+            .andReturn(authHeader);
+        if (exceptionCase) {
+            requestContext.abortWith(EasyMock.anyObject(Response.class));
+            EasyMock.expectLastCall();
+        }
+        replayAll();
+    }
+
+    private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException {
+        File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
+        jaasConfigFile.deleteOnExit();
+        previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
+
+        List<String> lines;
+        lines = new ArrayList<>();
+        lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required ");
+        if (includeFileOptions) {
+            lines.add("file=\"" + credentialFilePath + "\"");
+        }
+        lines.add(";};");
+        Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
+        previousConfiguration = Configuration.getConfiguration();
+        Configuration.setConfiguration(null);
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c9c32e7..3c76d0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -190,6 +190,13 @@ public class WorkerConfig extends AbstractConfig {
             + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
             + "/opt/connectors";
 
+    public static final String REST_EXTENSION_CLASSES_CONFIG = "rest.extension.classes";
+    protected static final String REST_EXTENSION_CLASSES_DOC =
+            "Comma-separated names of <code>ConnectRestExtension</code> classes, loaded and called "
+            + "in the order specified. Implementing the interface  "
+            + "<code>ConnectRestExtension</code> allows you to inject into Connect's REST API user defined resources  like filters. "
+            + "Typically used to add custom capability like logging, security, etc.";
+
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
     public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
@@ -254,7 +261,9 @@ public class WorkerConfig extends AbstractConfig {
                         ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
-                        Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
+                        Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
+                .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
new file mode 100644
index 0000000..a0f7fde
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.health;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.health.ConnectorHealth;
+import org.apache.kafka.connect.health.ConnectorState;
+import org.apache.kafka.connect.health.ConnectorType;
+import org.apache.kafka.connect.health.TaskState;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.util.Callback;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConnectClusterStateImpl implements ConnectClusterState {
+
+    private Herder herder;
+
+    public ConnectClusterStateImpl(Herder herder) {
+        this.herder = herder;
+    }
+
+    @Override
+    public Collection<String> connectors() {
+        final Collection<String> connectors = new ArrayList<>();
+        herder.connectors(new Callback<java.util.Collection<String>>() {
+            @Override
+            public void onCompletion(Throwable error, Collection<String> result) {
+                connectors.addAll(result);
+            }
+        });
+        return connectors;
+    }
+
+    @Override
+    public ConnectorHealth connectorHealth(String connName) {
+
+        ConnectorStateInfo state = herder.connectorStatus(connName);
+        ConnectorState connectorState = new ConnectorState(
+            state.connector().state(),
+            state.connector().workerId(),
+            state.connector().trace()
+        );
+        Map<Integer, TaskState> taskStates = taskStates(state.tasks());
+        ConnectorHealth connectorHealth = new ConnectorHealth(
+            connName,
+            connectorState,
+            taskStates,
+            ConnectorType.valueOf(state.type().name())
+        );
+        return connectorHealth;
+    }
+
+    private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
+
+        Map<Integer, TaskState> taskStates = new HashMap<>();
+
+        for (ConnectorStateInfo.TaskState state : states) {
+            taskStates.put(
+                state.id(),
+                new TaskState(state.id(), state.workerId(), state.state(), state.trace())
+            );
+        }
+        return taskStates;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index c67dfb5..b56bd1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -64,6 +66,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final SortedSet<PluginDesc<Converter>> converters;
     private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
+    private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
 
@@ -77,6 +80,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.converters = new TreeSet<>();
         this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
+        this.restExtensions = new TreeSet<>();
     }
 
     public DelegatingClassLoader(List<String> pluginPaths) {
@@ -99,6 +103,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         return transformations;
     }
 
+    public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
+        return restExtensions;
+    }
+
     public ClassLoader connectorLoader(Connector connector) {
         return connectorLoader(connector.getClass().getName());
     }
@@ -228,6 +236,8 @@ public class DelegatingClassLoader extends URLClassLoader {
             headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
+            addPlugins(plugins.restExtensions(), loader);
+            restExtensions.addAll(plugins.restExtensions());
         }
 
         loadJdbcDrivers(loader);
@@ -281,7 +291,8 @@ public class DelegatingClassLoader extends URLClassLoader {
                 getPluginDesc(reflections, Connector.class, loader),
                 getPluginDesc(reflections, Converter.class, loader),
                 getPluginDesc(reflections, HeaderConverter.class, loader),
-                getPluginDesc(reflections, Transformation.class, loader)
+                getPluginDesc(reflections, Transformation.class, loader),
+                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader)
         );
     }
 
@@ -295,23 +306,29 @@ public class DelegatingClassLoader extends URLClassLoader {
         Collection<PluginDesc<T>> result = new ArrayList<>();
         for (Class<? extends T> plugin : plugins) {
             if (PluginUtils.isConcrete(plugin)) {
-                // Temporary workaround until all the plugins are versioned.
-                if (Connector.class.isAssignableFrom(plugin)) {
-                    result.add(
-                            new PluginDesc<>(
-                                    plugin,
-                                    ((Connector) plugin.newInstance()).version(),
-                                    loader
-                            )
-                    );
-                } else {
-                    result.add(new PluginDesc<>(plugin, "undefined", loader));
-                }
+                result.add(new PluginDesc<>(plugin, versionFor(plugin.newInstance()), loader));
+            } else {
+                log.debug("Skipping {} as it is not concrete implementation", plugin);
             }
         }
         return result;
     }
 
+    private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass,
+                                                                     ClassLoader loader) {
+
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        Collection<PluginDesc<T>> result = new ArrayList<>();
+        for (T impl : serviceLoader) {
+            result.add(new PluginDesc<>(klass, versionFor(impl), loader));
+        }
+        return result;
+    }
+
+    private static <T>  String versionFor(T pluginImpl) {
+        return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : "undefined";
+    }
+
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
@@ -337,6 +354,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         addAliases(converters);
         addAliases(headerConverters);
         addAliases(transformations);
+        addAliases(restExtensions);
     }
 
     private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index c680f08..6f48e56 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
@@ -28,17 +29,20 @@ public class PluginScanResult {
     private final Collection<PluginDesc<Converter>> converters;
     private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
+    private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
             Collection<PluginDesc<Converter>> converters,
             Collection<PluginDesc<HeaderConverter>> headerConverters,
-            Collection<PluginDesc<Transformation>> transformations
+            Collection<PluginDesc<Transformation>> transformations,
+            Collection<PluginDesc<ConnectRestExtension>> restExtensions
     ) {
         this.connectors = connectors;
         this.converters = converters;
         this.headerConverters = headerConverters;
         this.transformations = transformations;
+        this.restExtensions = restExtensions;
     }
 
     public Collection<PluginDesc<Connector>> connectors() {
@@ -57,10 +61,15 @@ public class PluginScanResult {
         return transformations;
     }
 
+    public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
+        return restExtensions;
+    }
+
     public boolean isEmpty() {
         return connectors().isEmpty()
                && converters().isEmpty()
                && headerConverters().isEmpty()
-               && transformations().isEmpty();
+               && transformations().isEmpty()
+               && restExtensions().isEmpty();
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
index 5649213..918f9d7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
@@ -30,6 +31,7 @@ public enum PluginType {
     CONNECTOR(Connector.class),
     CONVERTER(Converter.class),
     TRANSFORMATION(Transformation.class),
+    REST_EXTENSION(ConnectRestExtension.class),
     UNKNOWN(Object.class);
 
     private Class<?> klass;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 30c41cd..9607410 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.components.Versioned;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
@@ -316,6 +318,53 @@ public class Plugins {
         return plugin;
     }
 
+
+    /**
+     * If the given class names are available in the classloader, return a list of new configured
+     * instances. If the instances implement {@link Configurable}, they are configured with provided {@param config}
+     *
+     * @param klassNames         the list of class names of plugins that needs to instantiated and configured
+     * @param config             the configuration containing the {@link org.apache.kafka.connect.runtime.Worker}'s configuration; may not be {@code null}
+     * @param pluginKlass        the type of the plugin class that is being instantiated
+     * @return the instantiated and configured list of plugins of type <T>; empty list if the {@param klassNames} is {@code null} or empty
+     * @throws ConnectException if the implementation class could not be found
+     */
+    public <T> List<T> newPlugins(List<String> klassNames, AbstractConfig config, Class<T> pluginKlass) {
+        List<T> plugins = new ArrayList<>();
+        if (klassNames != null) {
+            for (String klassName : klassNames) {
+                plugins.add(newPlugin(klassName, config, pluginKlass));
+            }
+        }
+        return plugins;
+    }
+
+    public <T> T newPlugin(String klassName, AbstractConfig config, Class<T> pluginKlass) {
+        T plugin;
+        Class<? extends T> klass;
+        try {
+            klass = pluginClass(delegatingLoader, klassName, pluginKlass);
+        } catch (ClassNotFoundException e) {
+            String msg = String.format("Failed to find any class that implements %s and which "
+                                       + "name matches %s", pluginKlass, klassName);
+            throw new ConnectException(msg);
+        }
+        plugin = newPlugin(klass);
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate '" + klassName + "'");
+        }
+        if (plugin instanceof Versioned) {
+            Versioned versionedPlugin = (Versioned) plugin;
+            if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) {
+                throw new ConnectException("Version not defined for '" + klassName + "'");
+            }
+        }
+        if (plugin instanceof Configurable) {
+            ((Configurable) plugin).configure(config.originals());
+        }
+        return plugin;
+    }
+
     /**
      * Get an instance of the give class specified by the given configuration key.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
new file mode 100644
index 0000000..c9c2c3b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.glassfish.jersey.server.ResourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+import javax.ws.rs.core.Configurable;
+import javax.ws.rs.core.Configuration;
+
+/**
+ * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate
+ * registrations deterministically by not re-registering them again.
+ */
+public class ConnectRestConfigurable implements Configurable<ResourceConfig> {
+
+    private static final Logger log = LoggerFactory.getLogger(ConnectRestConfigurable.class);
+
+    private static final boolean ALLOWED_TO_REGISTER = true;
+    private static final boolean NOT_ALLOWED_TO_REGISTER = false;
+
+    private ResourceConfig resourceConfig;
+
+    public ConnectRestConfigurable(ResourceConfig resourceConfig) {
+        Objects.requireNonNull(resourceConfig, "ResourceConfig can't be null");
+        this.resourceConfig = resourceConfig;
+    }
+
+
+    @Override
+    public Configuration getConfiguration() {
+        return resourceConfig.getConfiguration();
+    }
+
+    @Override
+    public ResourceConfig property(String name, Object value) {
+        return resourceConfig.property(name, value);
+    }
+
+    @Override
+    public ResourceConfig register(Object component) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, int priority) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, priority);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, Map contracts) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Object component, Class[] contracts) {
+        if (allowedToRegister(component)) {
+            resourceConfig.register(component, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, Map contracts) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, Class[] contracts) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, contracts);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass, int priority) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass, priority);
+        }
+        return resourceConfig;
+    }
+
+    @Override
+    public ResourceConfig register(Class componentClass) {
+        if (allowedToRegister(componentClass)) {
+            resourceConfig.register(componentClass);
+        }
+        return resourceConfig;
+    }
+
+    private boolean allowedToRegister(Object component) {
+        if (resourceConfig.isRegistered(component)) {
+            log.warn("The resource {} is already registered", component);
+            return NOT_ALLOWED_TO_REGISTER;
+        }
+        return ALLOWED_TO_REGISTER;
+    }
+
+    private boolean allowedToRegister(Class componentClass) {
+        if (resourceConfig.isRegistered(componentClass)) {
+            log.warn("The resource {} is already registered", componentClass);
+            return NOT_ALLOWED_TO_REGISTER;
+        }
+        return ALLOWED_TO_REGISTER;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
new file mode 100644
index 0000000..cdf282f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.health.ConnectClusterState;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
+
+import javax.ws.rs.core.Configurable;
+
+public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext {
+
+    private Configurable<? extends Configurable> configurable;
+    private ConnectClusterState clusterState;
+
+    public ConnectRestExtensionContextImpl(
+        Configurable<? extends Configurable> configurable,
+        ConnectClusterState clusterState
+    ) {
+        this.configurable = configurable;
+        this.clusterState = clusterState;
+    }
+
+    @Override
+    public Configurable<? extends Configurable> configurable() {
+        return configurable;
+    }
+
+    @Override
+    public ConnectClusterState clusterState() {
+        return clusterState;
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 8d7803e..5a589db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,10 +17,14 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -45,8 +49,7 @@ import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,6 +59,9 @@ import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
+
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
@@ -71,6 +77,8 @@ public class RestServer {
     private final WorkerConfig config;
     private Server jettyServer;
 
+    private List<ConnectRestExtension> connectRestExtensions = Collections.EMPTY_LIST;
+
     /**
      * Create a REST server for this herder using the specified configs.
      */
@@ -163,6 +171,8 @@ public class RestServer {
 
         resourceConfig.register(ConnectExceptionMapper.class);
 
+        registerRestExtensions(herder, resourceConfig);
+
         ServletContainer servletContainer = new ServletContainer(resourceConfig);
         ServletHolder servletHolder = new ServletHolder(servletContainer);
 
@@ -207,10 +217,19 @@ public class RestServer {
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
     }
 
+
+
     public void stop() {
         log.info("Stopping REST server");
 
         try {
+            for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+                try {
+                    connectRestExtension.close();
+                } catch (IOException e) {
+                    log.warn("Error while invoking close on " + connectRestExtension.getClass(), e);
+                }
+            }
             jettyServer.stop();
             jettyServer.join();
         } catch (Exception e) {
@@ -280,6 +299,22 @@ public class RestServer {
         return null;
     }
 
+    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
+        connectRestExtensions = herder.plugins().newPlugins(
+            config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+            config, ConnectRestExtension.class);
+
+        ConnectRestExtensionContext connectRestExtensionContext =
+            new ConnectRestExtensionContextImpl(
+                new ConnectRestConfigurable(resourceConfig),
+                new ConnectClusterStateImpl(herder)
+            );
+        for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
+            connectRestExtension.register(connectRestExtensionContext);
+        }
+
+    }
+
     public static String urlJoin(String base, String path) {
         if (base.endsWith("/") && path.startsWith("/"))
             return base + path.substring(1);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 25c2cb1..ca26e4e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -71,7 +71,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.kafka.connect.runtime.isolation.*"})
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
     private static final String TOPIC = "topic";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 877fe6b..5c8aa29 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -24,6 +24,8 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.storage.Converter;
@@ -37,6 +39,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -142,6 +145,26 @@ public class PluginsTest {
     }
 
     @Test
+    public void shouldInstantiateAndConfigureConnectRestExtension() {
+        props.put(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG,
+                  TestConnectRestExtension.class.getName());
+        createConfig();
+
+        List<ConnectRestExtension> connectRestExtensions =
+            plugins.newPlugins(config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
+                               config,
+                               ConnectRestExtension.class);
+        assertNotNull(connectRestExtensions);
+        assertEquals("One Rest Extension expected", 1, connectRestExtensions.size());
+        assertNotNull(connectRestExtensions.get(0));
+        assertTrue("Should be instance of TestConnectRestExtension",
+                   connectRestExtensions.get(0) instanceof TestConnectRestExtension);
+        assertNotNull(((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+        assertEquals(config.originals(),
+                     ((TestConnectRestExtension) connectRestExtensions.get(0)).configs);
+    }
+
+    @Test
     public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
         props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
         createConfig();
@@ -243,6 +266,30 @@ public class PluginsTest {
         }
     }
 
+
+    public static class TestConnectRestExtension implements ConnectRestExtension {
+
+        public Map<String, ?> configs;
+
+        @Override
+        public void register(ConnectRestExtensionContext restPluginContext) {
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+        }
+
+        @Override
+        public String version() {
+            return "test";
+        }
+    }
+
     public static class TestInternalConverter extends JsonConverter {
         public Map<String, ?> configs;
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index d26aa04..2f8704a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -32,18 +34,20 @@ import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.Invocation;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.Response;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
 import static org.junit.Assert.assertEquals;
 
 @RunWith(PowerMockRunner.class)
@@ -51,6 +55,8 @@ public class RestServerTest {
 
     @MockStrict
     private Herder herder;
+    @MockStrict
+    private Plugins plugins;
     private RestServer server;
 
     @After
@@ -151,8 +157,19 @@ public class RestServerTest {
 
     public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
         // To be able to set the Origin, we need to toggle this flag
+
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
         System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
 
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.EMPTY_LIST,
+                                           workerConfig,
+                                           ConnectRestExtension.class))
+            .andStubReturn(Collections.EMPTY_LIST);
+
         final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
         herder.connectors(EasyMock.capture(connectorsCallback));
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@@ -165,10 +182,7 @@ public class RestServerTest {
 
         PowerMock.replayAll();
 
-        Map<String, String> workerProps = baseWorkerProps();
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
-        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
-        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
         server = new RestServer(workerConfig);
         server.start(herder);
 
diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
new file mode 100644
index 0000000..0a1ef88
--- /dev/null
+++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConnectRestExtension
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 7082ddd..bbcdc31 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scal
         'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
         'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
         'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file',
-        'jmh-benchmarks'
+        'connect:basic-auth-extension', 'jmh-benchmarks'

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.