You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/20 02:39:56 UTC
kafka git commit: KAFKA-3424: Add CORS support to Connect REST API
Repository: kafka
Updated Branches:
refs/heads/trunk 655367971 -> eb823281a
KAFKA-3424: Add CORS support to Connect REST API
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #1099 from ewencp/cors-rest-support
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb823281
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb823281
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb823281
Branch: refs/heads/trunk
Commit: eb823281a52f3b27c3a889e7412bc07b3024e688
Parents: 6553679
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sat Mar 19 18:39:52 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sat Mar 19 18:39:52 2016 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../kafka/connect/runtime/WorkerConfig.java | 14 +-
.../kafka/connect/runtime/rest/RestServer.java | 12 ++
.../connect/runtime/rest/RestServerTest.java | 150 +++++++++++++++++++
gradle/dependencies.gradle | 1 +
5 files changed, 177 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 81e4af5..c29ad5a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -749,6 +749,7 @@ project(':connect:runtime') {
compile libs.jerseyContainerServlet
compile libs.jettyServer
compile libs.jettyServlet
+ compile libs.jettyServlets
compile libs.reflections
testCompile project(':clients').sourceSets.test.output
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 4ecacbb..471e4a5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -101,6 +101,15 @@ public class WorkerConfig extends AbstractConfig {
private static final String REST_ADVERTISED_PORT_DOC
= "If this is set, this is the port that will be given out to other workers to connect to.";
+ public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
+ protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
+ "Value to set the Access-Control-Allow-Origin header to for REST API requests." +
+ "To enable cross origin access, set this to the domain of the application that should be permitted" +
+ " to access the API, or '*' to allow access from any domain. The default value only allows access" +
+ " from the domain of the REST API.";
+ protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";
+
+
/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
@@ -129,7 +138,10 @@ public class WorkerConfig extends AbstractConfig {
.define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
.define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
.define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
- .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
+ .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
+ .define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
+ ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
+ ACCESS_CONTROL_ALLOW_ORIGIN_DOC);
}
public WorkerConfig(ConfigDef definition, Map<String, String> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 7e4279a..1505a01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -39,8 +39,10 @@ import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
+import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
@@ -52,9 +54,11 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import javax.servlet.DispatcherType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
@@ -109,6 +113,14 @@ public class RestServer {
context.setContextPath("/");
context.addServlet(servletHolder, "/*");
+ String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
+ if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
+ FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
+ filterHolder.setName("cross-origin");
+ filterHolder.setInitParameter("allowedOrigins", allowedOrigins);
+ context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
+ }
+
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setLoggerName(RestServer.class.getCanonicalName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
new file mode 100644
index 0000000..8e9d52b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+public class RestServerTest {
+
+ @MockStrict
+ private Herder herder;
+ private RestServer server;
+
+ @After
+ public void tearDown() {
+ server.stop();
+ }
+
+ private Map<String, String> baseWorkerProps() {
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ return workerProps;
+ }
+
+ @Test
+ public void testCORSEnabled() {
+ checkCORSRequest("*", "http://bar.com", "http://bar.com");
+ }
+
+ @Test
+ public void testCORSDisabled() {
+ checkCORSRequest("", "http://bar.com", null);
+ }
+
+ public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) {
+ // To be able to set the Origin, we need to toggle this flag
+ System.setProperty("sun.net.http.allowRestrictedHeaders", "true");
+
+ final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
+ herder.connectors(EasyMock.capture(connectorsCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
+ return null;
+ }
+ });
+ PowerMock.replayAll();
+
+ Map<String, String> workerProps = baseWorkerProps();
+ workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
+ WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+ server = new RestServer(workerConfig);
+ server.start(herder);
+
+ Response response = request("/connectors")
+ .header("Referer", origin + "/page")
+ .header("Origin", origin)
+ .get();
+ assertEquals(200, response.getStatus());
+
+ assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));
+ PowerMock.verifyAll();
+ }
+
+ protected Invocation.Builder request(String path) {
+ return request(path, null, null, null);
+ }
+
+ protected Invocation.Builder request(String path, Map<String, String> queryParams) {
+ return request(path, null, null, queryParams);
+ }
+
+ protected Invocation.Builder request(String path, String templateName, Object templateValue) {
+ return request(path, templateName, templateValue, null);
+ }
+
+ protected Invocation.Builder request(String path, String templateName, Object templateValue,
+ Map<String, String> queryParams) {
+ Client client = ClientBuilder.newClient();
+ WebTarget target;
+ URI pathUri = null;
+ try {
+ pathUri = new URI(path);
+ } catch (URISyntaxException e) {
+ // Ignore, use restConnect and assume this is a valid path part
+ }
+ if (pathUri != null && pathUri.isAbsolute()) {
+ target = client.target(path);
+ } else {
+ target = client.target(server.advertisedUrl()).path(path);
+ }
+ if (templateName != null && templateValue != null) {
+ target = target.resolveTemplate(templateName, templateValue);
+ }
+ if (queryParams != null) {
+ for (Map.Entry<String, String> queryParam : queryParams.entrySet()) {
+ target = target.queryParam(queryParam.getKey(), queryParam.getValue());
+ }
+ }
+ return target.request();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb823281/gradle/dependencies.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index aa1d3f9..47158d6 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -72,6 +72,7 @@ libs += [
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
+ jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
junit: "junit:junit:$versions.junit",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",