You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/24 13:57:18 UTC
[kafka] branch trunk updated: KAFKA-9944: Added supporting
customized HTTP response headers for Kafka Connect. (#8620)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2988eac KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620)
2988eac is described below
commit 2988eac0822022578bd2c6c5626bfbda3e8c73a6
Author: Jeff Huang <47...@users.noreply.github.com>
AuthorDate: Sun May 24 06:56:27 2020 -0700
KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620)
Added support for customizing the HTTP response headers for Kafka Connect as described in KIP-577.
Author: Jeff Huang <je...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
checkstyle/import-control.xml | 1 +
checkstyle/suppressions.xml | 3 +
.../apache/kafka/connect/runtime/WorkerConfig.java | 75 +++++++++++++++++++++-
.../kafka/connect/runtime/rest/RestServer.java | 16 +++++
.../kafka/connect/runtime/WorkerConfigTest.java | 48 ++++++++++++++
.../kafka/connect/runtime/rest/RestServerTest.java | 62 +++++++++++++++++-
6 files changed, 202 insertions(+), 3 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3fd5ea7..7119265 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -362,6 +362,7 @@
<allow pkg="org.reflections"/>
<allow pkg="org.reflections.util"/>
<allow pkg="javax.crypto"/>
+ <allow pkg="org.eclipse.jetty.util" />
<subpackage name="rest">
<allow pkg="org.eclipse.jetty" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f86cc5f..74e6961 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -144,6 +144,9 @@
<suppress checks="MethodLength"
files="Values.java"/>
+ <suppress checks="NPathComplexity"
+ files="RestServer.java"/>
+
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog)Test.java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 347e250..7a4f04e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -34,11 +34,14 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
+import org.eclipse.jetty.util.StringUtil;
+
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -49,6 +52,9 @@ public class WorkerConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);
private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
+ private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
+ Arrays.asList("set", "add", "setDate", "addDate")
+ );
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOTSTRAP_SERVERS_DOC
@@ -244,6 +250,10 @@ public class WorkerConfig extends AbstractConfig {
+ "user requests to reset the set of active topics per connector.";
protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
+ public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
+ public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
+ public static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
+
/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
@@ -324,7 +334,9 @@ public class WorkerConfig extends AbstractConfig {
.define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
.define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
- Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC);
+ Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC)
+ .define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT,
+ new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC);
}
private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
@@ -400,6 +412,48 @@ public class WorkerConfig extends AbstractConfig {
logInternalConverterDeprecationWarnings(props);
}
+ // Visible for testing
+ static void validateHttpResponseHeaderConfig(String config) {
+ try {
+ // validate format
+ String[] configTokens = config.trim().split("\\s+", 2);
+ if (configTokens.length != 2) {
+ throw new ConfigException(String.format("Invalid format of header config '%s\'. "
+ + "Expected: '[ation] [header name]:[header value]'", config));
+ }
+
+ // validate action
+ String method = configTokens[0].trim();
+ validateHeaderConfigAction(method);
+
+ // validate header name and header value pair
+ String header = configTokens[1];
+ String[] headerTokens = header.trim().split(":");
+ if (headerTokens.length != 2) {
+ throw new ConfigException(
+ String.format("Invalid format of header name and header value pair '%s'. "
+ + "Expected: '[header name]:[header value]'", header));
+ }
+
+ // validate header name
+ String headerName = headerTokens[0].trim();
+ if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+ throw new ConfigException(String.format("Invalid header name '%s'. "
+ + "The '[header name]' cannot contain whitespace", headerName));
+ }
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
+ }
+ }
+
+ // Visible for testing
+ static void validateHeaderConfigAction(String action) {
+ if (!HEADER_ACTIONS.stream().anyMatch(action::equalsIgnoreCase)) {
+ throw new ConfigException(String.format("Invalid header config action: '%s'. "
+ + "Expected one of %s", action, HEADER_ACTIONS));
+ }
+ }
+
private static class AdminListenersValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
@@ -427,4 +481,23 @@ public class WorkerConfig extends AbstractConfig {
}
}
+ private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ String strValue = (String) value;
+ if (strValue == null || strValue.trim().isEmpty()) {
+ return;
+ }
+
+ String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
+ Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig);
+ }
+
+ @Override
+ public String toString() {
+ return "Comma-separated header rules, where each header rule is of the form "
+ + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+ + "if any part of a header rule contains a comma";
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 02b4677..408f72e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -46,6 +46,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.servlets.HeaderFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
@@ -285,6 +286,11 @@ public class RestServer {
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}
+ String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+ if (headerConfig != null && !headerConfig.trim().isEmpty()) {
+ configureHttpResponsHeaderFilter(context);
+ }
+
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLogWriter slf4jRequestLogWriter = new Slf4jRequestLogWriter();
slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
@@ -472,4 +478,14 @@ public class RestServer {
return base + path;
}
+ /**
+ * Register header filter to ServletContextHandler.
+ * @param context The serverlet context handler
+ */
+ protected void configureHttpResponsHeaderFilter(ServletContextHandler context) {
+ String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+ FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class);
+ headerFilterHolder.setInitParameter("headerConfig", headerConfig);
+ context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index 33416b9..1eeb13e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -23,12 +23,38 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
public class WorkerConfigTest {
+ private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+ "add \t Cache-Control: no-cache, no-store, must-revalidate",
+ "add \r X-XSS-Protection: 1; mode=block",
+ "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
+ "AdD Strict-Transport-Security: \r max-age=31536000; includeSubDomains",
+ "AdD \t Strict-Transport-Security : \n max-age=31536000; includeSubDomains",
+ "add X-Content-Type-Options: \r nosniff",
+ "Set \t X-Frame-Options: \t Deny\n ",
+ "seT \t X-Cache-Info: \t not cacheable\n ",
+ "seTDate \t Expires: \r 31540000000",
+ "adDdate \n Last-Modified: \t 0"
+ );
+
+ private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
+ "set \t",
+ "badaction \t X-Frame-Options:DENY",
+ "set add X-XSS-Protection:1",
+ "addX-XSS-Protection",
+ "X-XSS-Protection:",
+ "add set X-XSS-Protection: 1",
+ "add X-XSS-Protection:1 X-XSS-Protection:1 ",
+ "add X-XSS-Protection",
+ "set X-Frame-Options:DENY, add :no-cache, no-store, must-revalidate "
+ );
@Test
public void testAdminListenersConfigAllowedValues() {
@@ -63,6 +89,28 @@ public class WorkerConfigTest {
new WorkerConfig(WorkerConfig.baseConfigDef(), props);
}
+ @Test
+ public void testInvalidHeaderConfigs() {
+ for (String config : INVALID_HEADER_CONFIGS) {
+ assertInvalidHeaderConfig(config);
+ }
+ }
+
+ @Test
+ public void testValidHeaderConfigs() {
+ for (String config : VALID_HEADER_CONFIGS) {
+ assertValidHeaderConfig(config);
+ }
+ }
+
+ private void assertInvalidHeaderConfig(String config) {
+ assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
+ }
+
+ private void assertValidHeaderConfig(String config) {
+ WorkerConfig.validateHttpResponseHeaderConfig(config);
+ }
+
private Map<String, String> baseProps() {
Map<String, String> props = new HashMap<>();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 575c4da..0c81ddd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -65,7 +65,6 @@ import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"})
public class RestServerTest {
-
@MockStrict
private Herder herder;
@MockStrict
@@ -76,7 +75,9 @@ public class RestServerTest {
@After
public void tearDown() {
- server.stop();
+ if (server != null) {
+ server.stop();
+ }
}
@SuppressWarnings("deprecation")
@@ -400,6 +401,63 @@ public class RestServerTest {
Assert.assertEquals(404, response.getStatusLine().getStatusCode());
}
+ @Test
+ public void testValidCustomizedHttpResponseHeaders() throws IOException {
+ String headerConfig =
+ "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+ Map<String, String> expectedHeaders = new HashMap<>();
+ expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+ expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+ checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+ }
+
+ @Test
+ public void testDefaultCustomizedHttpResponseHeaders() throws IOException {
+ String headerConfig = "";
+ Map<String, String> expectedHeaders = new HashMap<>();
+ checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+ }
+
+ private void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+ throws IOException {
+ Map<String, String> workerProps = baseWorkerProps();
+ workerProps.put("offset.storage.file.filename", "/tmp");
+ workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+ WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+ EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+ EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+ EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+ workerConfig,
+ ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+ EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+ PowerMock.replayAll();
+
+ server = new RestServer(workerConfig);
+ try {
+ server.initializeServer();
+ server.initializeResources(herder);
+ HttpRequest request = new HttpGet("/connectors");
+ try (CloseableHttpClient httpClient = HttpClients.createMinimal()) {
+ HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+ try (CloseableHttpResponse response = httpClient.execute(httpHost, request)) {
+ Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+ if (!headerConfig.isEmpty()) {
+ expectedHeaders.forEach((k, v) ->
+ Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+ } else {
+ Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+ }
+ }
+ }
+ } finally {
+ server.stop();
+ server = null;
+ }
+ }
+
private String executeGet(String host, int port, String endpoint) throws IOException {
HttpRequest request = new HttpGet(endpoint);
CloseableHttpClient httpClient = HttpClients.createMinimal();