You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/06/04 02:33:24 UTC

[kafka] branch 2.2 updated: KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 8ea05ee  KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)
8ea05ee is described below

commit 8ea05ee01ba41eeb4d68b0e7549cbdc07283d13d
Author: Hai-Dang Dam <da...@gmail.com>
AuthorDate: Mon Jun 3 19:06:00 2019 -0700

    KAFKA-8404: Add HttpHeader to RestClient HTTP Request and Connector REST API (#6791)
    
    When Connect forwards a REST request from one worker to another, the Authorization header was not forwarded. This commit changes the Connect framework to add include the authorization header when forwarding requests to other workers.
    
    Author: Hai-Dang Dam <da...@gmail.com>
    Reviewers: Robert Yokota <ra...@gmail.com>, Randall Hauch <rh...@gmail.com>
    
    # Conflicts:
    #	connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
    #	connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
---
 .../basic/auth/extension/JaasBasicAuthFilter.java  |  16 +--
 .../auth/extension/JaasBasicAuthFilterTest.java    |  34 +++++-
 .../runtime/distributed/DistributedHerder.java     |   2 +-
 .../kafka/connect/runtime/rest/RestClient.java     |  21 +++-
 .../runtime/rest/resources/ConnectorsResource.java |  49 +++++---
 .../resources/ConnectorPluginsResourceTest.java    |   3 +-
 .../rest/resources/ConnectorsResourceTest.java     | 123 ++++++++++++++-------
 7 files changed, 177 insertions(+), 71 deletions(-)

diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
index 6167434..d5b15c6 100644
--- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
+++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import java.util.regex.Pattern;
+import javax.ws.rs.HttpMethod;
 import org.apache.kafka.common.config.ConfigException;
 
 import java.io.IOException;
@@ -35,18 +37,18 @@ import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.core.Response;
 
 public class JaasBasicAuthFilter implements ContainerRequestFilter {
-
     private static final String CONNECT_LOGIN_MODULE = "KafkaConnect";
     static final String AUTHORIZATION = "Authorization";
-
+    private static final Pattern TASK_REQUEST_PATTERN = Pattern.compile("/?connectors/([^/]+)/tasks/?");
     @Override
     public void filter(ContainerRequestContext requestContext) throws IOException {
-
         try {
-            LoginContext loginContext =
-                new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
-                    requestContext.getHeaderString(AUTHORIZATION)));
-            loginContext.login();
+            if (!(requestContext.getMethod().equals(HttpMethod.POST) && TASK_REQUEST_PATTERN.matcher(requestContext.getUriInfo().getPath()).matches())) {
+                LoginContext loginContext =
+                    new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
+                        requestContext.getHeaderString(AUTHORIZATION)));
+                loginContext.login();
+            }
         } catch (LoginException | ConfigException e) {
             requestContext.abortWith(
                 Response.status(Response.Status.UNAUTHORIZED)
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index 835bef0..fe5f8b9 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.kafka.connect.rest.basic.auth.extension;
 
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.UriInfo;
 import org.apache.kafka.common.security.JaasUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -52,6 +55,9 @@ public class JaasBasicAuthFilterTest {
     private String previousJaasConfig;
     private Configuration previousConfiguration;
 
+    @MockStrict
+    private UriInfo uriInfo;
+
     @Before
     public void setup() {
         EasyMock.reset(requestContext);
@@ -137,7 +143,34 @@ public class JaasBasicAuthFilterTest {
         jaasBasicAuthFilter.filter(requestContext);
     }
 
+    @Test
+    public void testPostWithoutAppropriateCredential() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
+
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
+    @Test
+    public void testPostNotChangingConnectorTask() throws IOException {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
+        EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
+        EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
+        String authHeader = "Basic" + Base64.getEncoder().encodeToString(("user" + ":" + "password").getBytes());
+        EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
+            .andReturn(authHeader);
+        requestContext.abortWith(EasyMock.anyObject(Response.class));
+        EasyMock.expectLastCall();
+        PowerMock.replayAll();
+        jaasBasicAuthFilter.filter(requestContext);
+        EasyMock.verify(requestContext);
+    }
+
     private void setMock(String authorization, String username, String password, boolean exceptionCase) {
+        EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
         String authHeader = authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
         EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
             .andReturn(authHeader);
@@ -152,7 +185,6 @@ public class JaasBasicAuthFilterTest {
         File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
         jaasConfigFile.deleteOnExit();
         previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
-
         List<String> lines;
         lines = new ArrayList<>();
         lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required ");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 711b6c9..b229102 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1031,7 +1031,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                                     return;
                                 }
                                 String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + connName + "/tasks");
-                                RestClient.httpRequest(reconfigUrl, "POST", rawTaskProps, null, config);
+                                RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config);
                                 cb.onCompletion(null, null);
                             } catch (ConnectException e) {
                                 log.error("Request to leader to reconfigure connector tasks failed", e);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 15e8418..de11f26 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
@@ -50,12 +51,13 @@ public class RestClient {
      *
      * @param url             HTTP connection will be established with this url.
      * @param method          HTTP method ("GET", "POST", "PUT", etc.)
+     * @param headers         HTTP headers from REST endpoint
      * @param requestBodyData Object to serialize as JSON and send in the request body.
      * @param responseFormat  Expected format of the response to the HTTP request.
      * @param <T>             The type of the deserialized response to the HTTP request.
      * @return The deserialized response to the HTTP request, or null if no data is expected.
      */
-    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
+    public static <T> HttpResponse<T> httpRequest(String url, String method, HttpHeaders headers, Object requestBodyData,
                                                   TypeReference<T> responseFormat, WorkerConfig config) {
         HttpClient client;
 
@@ -82,6 +84,8 @@ public class RestClient {
             req.method(method);
             req.accept("application/json");
             req.agent("kafka-connect");
+            addHeadersToRequest(headers, req);
+
             if (serializedBody != null) {
                 req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8), "application/json");
             }
@@ -116,6 +120,21 @@ public class RestClient {
         }
     }
 
+
+    /**
+     * Extract headers from REST call and add to client request
+     * @param headers         Headers from REST endpoint
+     * @param req             The client request to modify
+     */
+    private static void addHeadersToRequest(HttpHeaders headers, Request req) {
+        if (headers != null) {
+            String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION);
+            if (credentialAuthorization != null) {
+                req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
+            }
+        }
+    }
+
     /**
      * Convert response parameters from Jetty format (HttpFields)
      * @param httpFields
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 4a04512..26a09ea 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -44,6 +46,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
@@ -79,16 +82,18 @@ public class ConnectorsResource {
 
     @GET
     @Path("/")
-    public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
+    public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward,
+                                             final @Context HttpHeaders headers) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
         herder.connectors(cb);
-        return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
+        return completeOrForwardRequest(cb, "/connectors", "GET", headers, null, new TypeReference<Collection<String>>() {
         }, forward);
     }
 
     @POST
     @Path("/")
     public Response createConnector(final @QueryParam("forward") Boolean forward,
+                                    final @Context HttpHeaders headers,
                                     final CreateConnectorRequest createRequest) throws Throwable {
         // Trim leading and trailing whitespaces from the connector name, replace null with empty string
         // if no name element present to keep validation within validator (NonEmptyStringWithoutControlChars
@@ -100,7 +105,7 @@ public class ConnectorsResource {
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
-        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
+        Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
 
         URI location = UriBuilder.fromUri("/connectors").path(name).build();
@@ -110,19 +115,21 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}")
     public ConnectorInfo getConnector(final @PathParam("connector") String connector,
+                                      final @Context HttpHeaders headers,
                                       final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
     }
 
     @GET
     @Path("/{connector}/config")
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
+                                                  final @Context HttpHeaders headers,
                                                   final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
     }
 
     @GET
@@ -134,6 +141,7 @@ public class ConnectorsResource {
     @PUT
     @Path("/{connector}/config")
     public Response putConnectorConfig(final @PathParam("connector") String connector,
+                                       final @Context HttpHeaders headers,
                                        final @QueryParam("forward") Boolean forward,
                                        final Map<String, String> connectorConfig) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
@@ -141,7 +149,7 @@ public class ConnectorsResource {
 
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
-                "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
+                "PUT", headers, connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
         if (createdInfo.created()) {
             URI location = UriBuilder.fromUri("/connectors").path(connector).build();
@@ -155,15 +163,16 @@ public class ConnectorsResource {
     @POST
     @Path("/{connector}/restart")
     public void restartConnector(final @PathParam("connector") String connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", headers, null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
-    public Response pauseConnector(@PathParam("connector") String connector) {
+    public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) {
         herder.pauseConnector(connector);
         return Response.accepted().build();
     }
@@ -178,26 +187,29 @@ public class ConnectorsResource {
     @GET
     @Path("/{connector}/tasks")
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
+                                         final @Context HttpHeaders headers,
                                          final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
         herder.taskConfigs(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
 
     @POST
     @Path("/{connector}/tasks")
     public void putTaskConfigs(final @PathParam("connector") String connector,
+                               final @Context HttpHeaders headers,
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.putTaskConfigs(connector, taskConfigs, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
     }
 
     @GET
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
+                                                      final @Context HttpHeaders headers,
                                                       final @PathParam("task") Integer task) throws Throwable {
         return herder.taskStatus(new ConnectorTaskId(connector, task));
     }
@@ -206,20 +218,22 @@ public class ConnectorsResource {
     @Path("/{connector}/tasks/{task}/restart")
     public void restartTask(final @PathParam("connector") String connector,
                             final @PathParam("task") Integer task,
+                            final @Context HttpHeaders headers,
                             final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
         herder.restartTask(taskId, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", headers, null, forward);
     }
 
     @DELETE
     @Path("/{connector}")
     public void destroyConnector(final @PathParam("connector") String connector,
+                                 final @Context HttpHeaders headers,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.deleteConnectorConfig(connector, cb);
-        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
+        completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
     }
 
     // Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
@@ -239,6 +253,7 @@ public class ConnectorsResource {
     private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
                                               String path,
                                               String method,
+                                              HttpHeaders headers,
                                               Object body,
                                               TypeReference<U> resultType,
                                               Translator<T, U> translator,
@@ -261,7 +276,7 @@ public class ConnectorsResource {
                             .build()
                             .toString();
                     log.debug("Forwarding request {} {} {}", forwardUrl, method, body);
-                    return translator.translate(RestClient.httpRequest(forwardUrl, method, body, resultType, config));
+                    return translator.translate(RestClient.httpRequest(forwardUrl, method, headers, body, resultType, config));
                 } else {
                     // we should find the right target for the query within two hops, so if
                     // we don't, it probably means that a rebalance has taken place.
@@ -283,14 +298,14 @@ public class ConnectorsResource {
         }
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, Object body,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers, Object body,
                                            TypeReference<T> resultType, Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator<T>(), forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, resultType, new IdentityTranslator<T>(), forward);
     }
 
-    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method,
+    private <T> T completeOrForwardRequest(FutureCallback<T> cb, String path, String method, HttpHeaders headers,
                                            Object body, Boolean forward) throws Throwable {
-        return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator<T>(), forward);
+        return completeOrForwardRequest(cb, path, method, headers, body, null, new IdentityTranslator<T>(), forward);
     }
 
     private interface Translator<T, U> {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index a3aee6a..2882355 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -182,7 +183,7 @@ public class ConnectorPluginsResourceTest {
     @Before
     public void setUp() throws Exception {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..ba5a2c3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
+import javax.ws.rs.core.HttpHeaders;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -75,6 +76,7 @@ public class ConnectorsResourceTest {
     private static final String CONNECTOR_NAME_PADDING_WHITESPACES = "   " + CONNECTOR_NAME + "  \n  ";
     private static final Boolean FORWARD = true;
     private static final Map<String, String> CONNECTOR_CONFIG_SPECIAL_CHARS = new HashMap<>();
+    private static final HttpHeaders NULL_HEADERS = null;
     static {
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("name", CONNECTOR_NAME_SPECIAL_CHARS);
         CONNECTOR_CONFIG_SPECIAL_CHARS.put("sample_config", "test_config");
@@ -125,7 +127,7 @@ public class ConnectorsResourceTest {
     @Before
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
+                RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));
         connectorsResource = new ConnectorsResource(herder, null);
     }
 
@@ -142,7 +144,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
+        Collection<String> connectors = connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
         // Ordering isn't guaranteed, compare sets
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
 
@@ -156,15 +158,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.isNull(), EasyMock.isNull(), EasyMock.anyObject(TypeReference.class), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(200, new HashMap<String, String>(), Arrays.asList(CONNECTOR2_NAME, CONNECTOR_NAME)));
 
         PowerMock.replayAll();
 
-        Collection<String> connectors = connectorsResource.listConnectors(FORWARD);
-        // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, CONNECTOR2_NAME)), new HashSet<>(connectors));
-
+        Collection<String> connectors = connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
         PowerMock.verifyAll();
     }
 
@@ -177,7 +176,7 @@ public class ConnectorsResourceTest {
         PowerMock.replayAll();
 
         // throws
-        connectorsResource.listConnectors(FORWARD);
+        connectorsResource.listConnectors(FORWARD, NULL_HEADERS);
     }
 
     @Test
@@ -191,7 +190,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -204,19 +203,57 @@ public class ConnectorsResourceTest {
         herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+        EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors?forward=false"), EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.eq(body), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(201, new HashMap<String, String>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES,
                     ConnectorType.SOURCE)));
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
 
 
     }
 
+    @Test
+    public void testCreateConnectorWithHeaderAuthorization() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn("Basic YWxhZGRpbjpvcGVuc2VzYW1l").times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
+
+
+    @Test
+    public void testCreateConnectorWithoutHeaderAuthorization() throws Throwable {
+        CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
+        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = Capture.newInstance();
+        HttpHeaders httpHeaders = EasyMock.mock(HttpHeaders.class);
+        EasyMock.expect(httpHeaders.getHeaderString("Authorization")).andReturn(null).times(1);
+        EasyMock.replay(httpHeaders);
+        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
+        expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
+            CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
+
+        PowerMock.replayAll();
+
+        connectorsResource.createConnector(FORWARD, httpHeaders, body);
+
+        PowerMock.verifyAll();
+    }
+
     @Test(expected = AlreadyExistsException.class)
     public void testCreateConnectorExists() throws Throwable {
         CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
@@ -227,7 +264,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, body);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
 
         PowerMock.verifyAll();
     }
@@ -246,7 +283,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -265,7 +302,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -284,7 +321,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.createConnector(FORWARD, bodyIn);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
 
         PowerMock.verifyAll();
     }
@@ -297,7 +334,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -308,12 +345,12 @@ public class ConnectorsResourceTest {
         herder.deleteConnectorConfig(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
         expectAndCallbackNotLeaderException(cb);
         // Should forward request
-        EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", null, null, null))
+        EasyMock.expect(RestClient.httpRequest("http://leader:8083/connectors/" + CONNECTOR_NAME + "?forward=false", "DELETE", NULL_HEADERS, null, null, null))
                 .andReturn(new RestClient.HttpResponse<>(204, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -327,7 +364,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.destroyConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.destroyConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -341,7 +378,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, FORWARD);
+        ConnectorInfo connInfo = connectorsResource.getConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE),
             connInfo);
 
@@ -356,7 +393,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+        Map<String, String> connConfig = connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(CONNECTOR_CONFIG, connConfig);
 
         PowerMock.verifyAll();
@@ -370,7 +407,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getConnectorConfig(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -384,7 +421,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, CONNECTOR_CONFIG);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG);
 
         PowerMock.verifyAll();
     }
@@ -400,7 +437,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
 
@@ -418,7 +455,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.createConnector(FORWARD, body).getLocation().toString();
+        String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -435,7 +472,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
+        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_SPECIAL_CHARS, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_SPECIAL_CHARS).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_SPECIAL_CHARS, decoded);
 
@@ -452,7 +489,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
+        String rspLocation = connectorsResource.putConnectorConfig(CONNECTOR_NAME_CONTROL_SEQUENCES1, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_CONTROL_SEQUENCES).getLocation().toString();
         String decoded = new URI(rspLocation).getPath();
         Assert.assertEquals("/connectors/" + CONNECTOR_NAME_CONTROL_SEQUENCES1, decoded);
 
@@ -463,7 +500,7 @@ public class ConnectorsResourceTest {
     public void testPutConnectorConfigNameMismatch() throws Throwable {
         Map<String, String> connConfig = new HashMap<>(CONNECTOR_CONFIG);
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig);
+        connectorsResource.putConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, connConfig);
     }
 
     @Test(expected = BadRequestException.class)
@@ -471,7 +508,7 @@ public class ConnectorsResourceTest {
         Map<String, String> connConfig = new HashMap<>();
         connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
         CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
-        connectorsResource.createConnector(FORWARD, request);
+        connectorsResource.createConnector(FORWARD, NULL_HEADERS, request);
     }
 
     @Test
@@ -482,7 +519,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+        List<TaskInfo> taskInfos = connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
         assertEquals(TASK_INFOS, taskInfos);
 
         PowerMock.verifyAll();
@@ -496,7 +533,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.getTaskConfigs(CONNECTOR_NAME, FORWARD);
+        connectorsResource.getTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -509,7 +546,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -522,7 +559,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, FORWARD, TASK_CONFIGS);
+        connectorsResource.putTaskConfigs(CONNECTOR_NAME, NULL_HEADERS, FORWARD, TASK_CONFIGS);
 
         PowerMock.verifyAll();
     }
@@ -535,7 +572,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, FORWARD);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -547,12 +584,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, null);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, null);
 
         PowerMock.verifyAll();
     }
@@ -565,12 +602,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartConnector(CONNECTOR_NAME, true);
+        connectorsResource.restartConnector(CONNECTOR_NAME, NULL_HEADERS, true);
 
         PowerMock.verifyAll();
     }
@@ -584,7 +621,7 @@ public class ConnectorsResourceTest {
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, FORWARD);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, FORWARD);
 
         PowerMock.verifyAll();
     }
@@ -598,12 +635,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackNotLeaderException(cb);
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://leader:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=true"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, null);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, null);
 
         PowerMock.verifyAll();
     }
@@ -618,12 +655,12 @@ public class ConnectorsResourceTest {
         expectAndCallbackException(cb, new NotAssignedException("not owner test", ownerUrl));
 
         EasyMock.expect(RestClient.httpRequest(EasyMock.eq("http://owner:8083/connectors/" + CONNECTOR_NAME + "/tasks/0/restart?forward=false"),
-                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
+                EasyMock.eq("POST"), EasyMock.isNull(), EasyMock.isNull(), EasyMock.<TypeReference>anyObject(), EasyMock.anyObject(WorkerConfig.class)))
                 .andReturn(new RestClient.HttpResponse<>(202, new HashMap<String, String>(), null));
 
         PowerMock.replayAll();
 
-        connectorsResource.restartTask(CONNECTOR_NAME, 0, true);
+        connectorsResource.restartTask(CONNECTOR_NAME, 0, NULL_HEADERS, true);
 
         PowerMock.verifyAll();
     }