You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/05/24 13:57:18 UTC

[kafka] branch trunk updated: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2988eac  KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620)
2988eac is described below

commit 2988eac0822022578bd2c6c5626bfbda3e8c73a6
Author: Jeff Huang <47...@users.noreply.github.com>
AuthorDate: Sun May 24 06:56:27 2020 -0700

    KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect. (#8620)
    
    Added support for customizing the HTTP response headers for Kafka Connect as described in KIP-577.
    
    Author: Jeff Huang <je...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>
---
 checkstyle/import-control.xml                      |  1 +
 checkstyle/suppressions.xml                        |  3 +
 .../apache/kafka/connect/runtime/WorkerConfig.java | 75 +++++++++++++++++++++-
 .../kafka/connect/runtime/rest/RestServer.java     | 16 +++++
 .../kafka/connect/runtime/WorkerConfigTest.java    | 48 ++++++++++++++
 .../kafka/connect/runtime/rest/RestServerTest.java | 62 +++++++++++++++++-
 6 files changed, 202 insertions(+), 3 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3fd5ea7..7119265 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -362,6 +362,7 @@
       <allow pkg="org.reflections"/>
       <allow pkg="org.reflections.util"/>
       <allow pkg="javax.crypto"/>
+      <allow pkg="org.eclipse.jetty.util" />
 
       <subpackage name="rest">
         <allow pkg="org.eclipse.jetty" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f86cc5f..74e6961 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -144,6 +144,9 @@
     <suppress checks="MethodLength"
               files="Values.java"/>
 
+    <suppress checks="NPathComplexity"
+              files="RestServer.java"/>
+
     <!-- connect tests-->
     <suppress checks="ClassDataAbstractionCoupling"
               files="(DistributedHerder|KafkaBasedLog)Test.java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 347e250..7a4f04e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -34,11 +34,14 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
+import org.eclipse.jetty.util.StringUtil;
+
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
@@ -49,6 +52,9 @@ public class WorkerConfig extends AbstractConfig {
     private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);
 
     private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");
+    private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
+            Arrays.asList("set", "add", "setDate", "addDate")
+    );
 
     public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
     public static final String BOOTSTRAP_SERVERS_DOC
@@ -244,6 +250,10 @@ public class WorkerConfig extends AbstractConfig {
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
+    public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
+    public static final String RESPONSE_HTTP_HEADERS_DEFAULT = "";
+
     /**
      * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
      * bootstrap their own ConfigDef.
@@ -324,7 +334,9 @@ public class WorkerConfig extends AbstractConfig {
                 .define(TOPIC_TRACKING_ENABLE_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ENABLE_DEFAULT,
                         Importance.LOW, TOPIC_TRACKING_ENABLE_DOC)
                 .define(TOPIC_TRACKING_ALLOW_RESET_CONFIG, Type.BOOLEAN, TOPIC_TRACKING_ALLOW_RESET_DEFAULT,
-                        Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC);
+                        Importance.LOW, TOPIC_TRACKING_ALLOW_RESET_DOC)
+                .define(RESPONSE_HTTP_HEADERS_CONFIG, Type.STRING, RESPONSE_HTTP_HEADERS_DEFAULT,
+                        new ResponseHttpHeadersValidator(), Importance.LOW, RESPONSE_HTTP_HEADERS_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
@@ -400,6 +412,48 @@ public class WorkerConfig extends AbstractConfig {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    // Visible for testing
+    static void validateHttpResponseHeaderConfig(String config) {
+        try {
+            // validate format
+            String[] configTokens = config.trim().split("\\s+", 2);
+            if (configTokens.length != 2) {
+                throw new ConfigException(String.format("Invalid format of header config '%s\'. "
+                        + "Expected: '[ation] [header name]:[header value]'", config));
+            }
+
+            // validate action
+            String method = configTokens[0].trim();
+            validateHeaderConfigAction(method);
+
+            // validate header name and header value pair
+            String header = configTokens[1];
+            String[] headerTokens = header.trim().split(":");
+            if (headerTokens.length != 2) {
+                throw new ConfigException(
+                        String.format("Invalid format of header name and header value pair '%s'. "
+                                + "Expected: '[header name]:[header value]'", header));
+            }
+
+            // validate header name
+            String headerName = headerTokens[0].trim();
+            if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+                throw new ConfigException(String.format("Invalid header name '%s'. "
+                        + "The '[header name]' cannot contain whitespace", headerName));
+            }
+        } catch (ArrayIndexOutOfBoundsException e) {
+            throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
+        }
+    }
+
+    // Visible for testing
+    static void validateHeaderConfigAction(String action) {
+        if (!HEADER_ACTIONS.stream().anyMatch(action::equalsIgnoreCase)) {
+            throw new ConfigException(String.format("Invalid header config action: '%s'. "
+                    + "Expected one of %s", action, HEADER_ACTIONS));
+        }
+    }
+
     private static class AdminListenersValidator implements ConfigDef.Validator {
         @Override
         public void ensureValid(String name, Object value) {
@@ -427,4 +481,23 @@ public class WorkerConfig extends AbstractConfig {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue == null || strValue.trim().isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue); // handles and removed surrounding quotes
+            Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig);
+        }
+
+        @Override
+        public String toString() {
+            return "Comma-separated header rules, where each header rule is of the form "
+                    + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                    + "if any part of a header rule contains a comma";
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 02b4677..408f72e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -46,6 +46,7 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.servlets.HeaderFilter;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.server.ServerProperties;
@@ -285,6 +286,11 @@ public class RestServer {
             context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
         }
 
+        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+        if (headerConfig != null && !headerConfig.trim().isEmpty()) {
+            configureHttpResponsHeaderFilter(context);
+        }
+
         RequestLogHandler requestLogHandler = new RequestLogHandler();
         Slf4jRequestLogWriter slf4jRequestLogWriter = new Slf4jRequestLogWriter();
         slf4jRequestLogWriter.setLoggerName(RestServer.class.getCanonicalName());
@@ -472,4 +478,14 @@ public class RestServer {
             return base + path;
     }
 
+    /**
+     * Register header filter to ServletContextHandler.
+     * @param context The serverlet context handler
+     */
+    protected void configureHttpResponsHeaderFilter(ServletContextHandler context) {
+        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+        FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class);
+        headerFilterHolder.setInitParameter("headerConfig", headerConfig);
+        context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index 33416b9..1eeb13e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -23,12 +23,38 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
 
 public class WorkerConfigTest {
+    private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+            "add \t Cache-Control: no-cache, no-store, must-revalidate",
+            "add \r X-XSS-Protection: 1; mode=block",
+            "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
+            "AdD   Strict-Transport-Security:  \r  max-age=31536000;  includeSubDomains",
+            "AdD \t Strict-Transport-Security : \n   max-age=31536000;  includeSubDomains",
+            "add X-Content-Type-Options: \r nosniff",
+            "Set \t X-Frame-Options: \t Deny\n ",
+            "seT \t X-Cache-Info: \t not cacheable\n ",
+            "seTDate \t Expires: \r 31540000000",
+            "adDdate \n Last-Modified: \t 0"
+    );
+
+    private static final List<String> INVALID_HEADER_CONFIGS = Arrays.asList(
+            "set \t",
+            "badaction \t X-Frame-Options:DENY",
+            "set add X-XSS-Protection:1",
+            "addX-XSS-Protection",
+            "X-XSS-Protection:",
+            "add set X-XSS-Protection: 1",
+            "add X-XSS-Protection:1 X-XSS-Protection:1 ",
+            "add X-XSS-Protection",
+            "set X-Frame-Options:DENY, add  :no-cache, no-store, must-revalidate "
+    );
 
     @Test
     public void testAdminListenersConfigAllowedValues() {
@@ -63,6 +89,28 @@ public class WorkerConfigTest {
         new WorkerConfig(WorkerConfig.baseConfigDef(), props);
     }
 
+    @Test
+    public void testInvalidHeaderConfigs() {
+        for (String config : INVALID_HEADER_CONFIGS) {
+            assertInvalidHeaderConfig(config);
+        }
+    }
+
+    @Test
+    public void testValidHeaderConfigs() {
+        for (String config : VALID_HEADER_CONFIGS) {
+            assertValidHeaderConfig(config);
+        }
+    }
+
+    private void assertInvalidHeaderConfig(String config) {
+        assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
+    }
+
+    private void assertValidHeaderConfig(String config) {
+        WorkerConfig.validateHttpResponseHeaderConfig(config);
+    }
+
     private Map<String, String> baseProps() {
         Map<String, String> props = new HashMap<>();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 575c4da..0c81ddd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -65,7 +65,6 @@ import static org.junit.Assert.assertTrue;
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"})
 public class RestServerTest {
-
     @MockStrict
     private Herder herder;
     @MockStrict
@@ -76,7 +75,9 @@ public class RestServerTest {
 
     @After
     public void tearDown() {
-        server.stop();
+        if (server != null) {
+            server.stop();
+        }
     }
 
     @SuppressWarnings("deprecation")
@@ -400,6 +401,63 @@ public class RestServerTest {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    private void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        try {
+            server.initializeServer();
+            server.initializeResources(herder);
+            HttpRequest request = new HttpGet("/connectors");
+            try (CloseableHttpClient httpClient = HttpClients.createMinimal()) {
+                HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+                try (CloseableHttpResponse response = httpClient.execute(httpHost, request)) {
+                    Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+                    if (!headerConfig.isEmpty()) {
+                        expectedHeaders.forEach((k, v) ->
+                                Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+                    } else {
+                        Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+                    }
+                }
+            }
+        } finally {
+            server.stop();
+            server = null;
+        }
+    }
+
     private String executeGet(String host, int port, String endpoint) throws IOException {
         HttpRequest request = new HttpGet(endpoint);
         CloseableHttpClient httpClient = HttpClients.createMinimal();