You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/20 02:39:56 UTC

kafka git commit: KAFKA-3424: Add CORS support to Connect REST API

Repository: kafka
Updated Branches:
  refs/heads/trunk 655367971 -> eb823281a


KAFKA-3424: Add CORS support to Connect REST API

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #1099 from ewencp/cors-rest-support


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb823281
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb823281
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb823281

Branch: refs/heads/trunk
Commit: eb823281a52f3b27c3a889e7412bc07b3024e688
Parents: 6553679
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sat Mar 19 18:39:52 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sat Mar 19 18:39:52 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../kafka/connect/runtime/WorkerConfig.java     |  14 +-
 .../kafka/connect/runtime/rest/RestServer.java  |  12 ++
 .../connect/runtime/rest/RestServerTest.java    | 150 +++++++++++++++++++
 gradle/dependencies.gradle                      |   1 +
 5 files changed, 177 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 81e4af5..c29ad5a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':connect:runtime') {
     compile libs.jerseyContainerServlet
     compile libs.jettyServer
     compile libs.jettyServlet
+    compile libs.jettyServlets
     compile libs.reflections
 
     testCompile project(':clients').sourceSets.test.output

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 4ecacbb..471e4a5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -101,6 +101,15 @@ public class WorkerConfig extends AbstractConfig {
     private static final String REST_ADVERTISED_PORT_DOC
             = "If this is set, this is the port that will be given out to other workers to connect to.";
 
+    public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
+    protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
+            "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
+                    "To enable cross origin access, set this to the domain of the application that should be permitted" +
+                    " to access the API, or '*' to allow access from any domain. The default value only allows access" +
+                    " from the domain of the REST API.";
+    protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
+
+
     /**
      * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
      * bootstrap their own ConfigDef.
@@ -129,7 +138,10 @@ public class WorkerConfig extends AbstractConfig {
                 .define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
                 .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
                 .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING,  null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
-                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
+                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT,  null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
+                .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
+                        ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
+                        ACCESS_CONTROL_ALLOW_ORIGIN_DOC);
     }
 
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 7e4279a..1505a01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -39,8 +39,10 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.CrossOriginFilter;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -52,9 +54,11 @@ import java.io.OutputStream;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
+import javax.servlet.DispatcherType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
 
@@ -109,6 +113,14 @@ public class RestServer {
         context.setContextPath("/");
         context.addServlet(servletHolder, "/*");
 
+        String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+        if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
+            FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
+            filterHolder.setName("cross-origin");
+            filterHolder.setInitParameter("allowedOrigins", allowedOrigins);
+            context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
+        }
+
         RequestLogHandler requestLogHandler = new RequestLogHandler();
         Slf4jRequestLog requestLog = new Slf4jRequestLog();
         requestLog.setLoggerName(RestServer.class.getCanonicalName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
new file mode 100644
index 0000000..8e9d52b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class RestServerTest {
+
+    @MockStrict
+    private Herder herder;
+    private RestServer server;
+
+    @After
+    public void tearDown() {
+        server.stop();
+    }
+
+    private Map<String, String> baseWorkerProps() {
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        return workerProps;
+    }
+
+    @Test
+    public void testCORSEnabled() {
+        checkCORSRequest("*", "http://bar.com", "http://bar.com");
+    }
+
+    @Test
+    public void testCORSDisabled() {
+        checkCORSRequest("", "http://bar.com", null);
+    }
+
+    public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) {
+        // To be able to set the Origin, we need to toggle this flag
+        System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+
+        final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
+        herder.connectors(EasyMock.capture(connectorsCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
+                return null;
+            }
+        });
+        PowerMock.replayAll();
+
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+        server = new RestServer(workerConfig);
+        server.start(herder);
+
+        Response response = request("/connectors")
+                .header("Referer", origin + "/page")
+                .header("Origin", origin)
+                .get();
+        assertEquals(200, response.getStatus());
+
+        assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));
+        PowerMock.verifyAll();
+    }
+
+    protected Invocation.Builder request(String path) {
+        return request(path, null, null, null);
+    }
+
+    protected Invocation.Builder request(String path, Map<String, String> queryParams) {
+        return request(path, null, null, queryParams);
+    }
+
+    protected Invocation.Builder request(String path, String templateName, Object templateValue) {
+        return request(path, templateName, templateValue, null);
+    }
+
+    protected Invocation.Builder request(String path, String templateName, Object templateValue,
+                                         Map<String, String> queryParams) {
+        Client client = ClientBuilder.newClient();
+        WebTarget target;
+        URI pathUri = null;
+        try {
+            pathUri = new URI(path);
+        } catch (URISyntaxException e) {
+            // Ignore, use restConnect and assume this is a valid path part
+        }
+        if (pathUri != null && pathUri.isAbsolute()) {
+            target = client.target(path);
+        } else {
+            target = client.target(server.advertisedUrl()).path(path);
+        }
+        if (templateName != null && templateValue != null) {
+            target = target.resolveTemplate(templateName, templateValue);
+        }
+        if (queryParams != null) {
+            for (Map.Entry<String, String> queryParam : queryParams.entrySet()) {
+                target = target.queryParam(queryParam.getKey(), queryParam.getValue());
+            }
+        }
+        return target.request();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index aa1d3f9..47158d6 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -72,6 +72,7 @@ libs += [
   jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
   jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
+  jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
   jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
   junit: "junit:junit:$versions.junit",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",