You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/05 22:27:42 UTC

[GitHub] [kafka] jeffhuang26 opened a new pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

jeffhuang26 opened a new pull request #8620:
URL: https://github.com/apache/kafka/pull/8620


   Added supporting customized  HTTP Response Headers for Kafka Connect REST server.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-632309779


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-633234287


   A few known flaky-test Streams failures in each of the 3 builds, and none were in Connect unit or integration tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-624682689


   okay to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-632309635


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421955594



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       Cannot pass compile after changed to either default package level or protected due to RestServerTest call it and they are on different package. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-629428228


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r420464820



##########
File path: checkstyle/checkstyle.xml
##########
@@ -132,7 +132,7 @@
     </module>
     <module name="NPathComplexity">
       <!-- default is 200 -->
-      <property name="max" value="500"/>
+      <property name="max" value="550"/>

Review comment:
       Why change the setting instead of modifying `suppressions.xml` to exclude certain classes from this rule?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -244,6 +244,14 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    /**
+     * @link "https://www.eclipse.org/jetty/documentation/current/header-filter.html"
+     * @link "https://www.eclipse.org/jetty/javadoc/9.4.28.v20200408/org/eclipse/jetty/servlets/HeaderFilter.html"
+     **/
+    public static final String RESPONSE_HTTP_HEADERS_CONFIG = "response.http.headers.config";
+    public static final String RESPONSE_HTTP_HEADERS_DOC = "Set values for Jetty HTTP response headers";

Review comment:
       I don't think we should expose `Jetty` here. Yes, we're following the Jetty grammar and format for these, but let's not unnecessarily expose the internals.
   ```suggestion
       public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST API HTTP response headers";
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##########
@@ -461,4 +469,18 @@ public static String urlJoin(String base, String path) {
             return base + path;
     }
 
+    /**
+     * Register header filter to ServletContextHandler.
+     * @param context The serverlet context handler
+     */
+    protected void configureHttpResponsHeaderFilter(ServletContextHandler context) {
+        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+        log.debug("headerConfig : " + headerConfig);

Review comment:
       Is this line really necessary? Isn't the `response.http.headers.config` property already logged at INFO level when the worker starts up, via the WorkerConfig (or rather DistributedConfig or StandaloneConfig) constructor?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +410,52 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       Why not implement these as a `ConfigDef.Validator` implementation, similar to the existing `AdminListenersValidator` below?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  {

Review comment:
       Nit:
   ```suggestion
       public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {

Review comment:
       Nit:
   ```suggestion
       public void testValidCustomizedHttpResponseHeaders() throws IOException  {
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "add X-XSS-Protection";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        server.stop();
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        server.stop();
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);

Review comment:
       The advantage of using `ConfigDef.validator` on the `response.http.headers.config` config key is that this constructor call would throw an exception if any invalid value is used, and much sooner, too.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "add X-XSS-Protection";

Review comment:
       Might be nice to have quite a few of these tests that verify various values are invalid and valid, to act as regression tests.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "add X-XSS-Protection";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);

Review comment:
       Should we have tests for the `DistributedConfig` class? Again, much of the logic should be the same, but the tests would each be simpler if using a `ConfigDef.Validator`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##########
@@ -461,4 +469,18 @@ public static String urlJoin(String base, String path) {
             return base + path;
     }
 
+    /**
+     * Register header filter to ServletContextHandler.
+     * @param context The serverlet context handler
+     */
+    protected void configureHttpResponsHeaderFilter(ServletContextHandler context) {
+        String headerConfig = config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+        log.debug("headerConfig : " + headerConfig);
+        String[] configs = StringUtil.csvSplit(headerConfig);
+        Arrays.stream(configs)
+                .forEach(WorkerConfig::validateHttpResponseHeaderConfig);

Review comment:
       Is there a reason we don't want to validate these properties up front when all of the other configuration validation is being performed, via `ConfigDef.Validator` on `response.http.headers.config`? If we do that, we don't need this line.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "add X-XSS-Protection";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        server.stop();
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new StandaloneConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        server.stop();

Review comment:
       If using `ConfigDef.Validator`, all of these lines would go away, and we actually don't need mocks of any kind.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r422511289



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       Actually should move these header config verification tests to WorkerConfigTest. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r422511289



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       Actually should move these header config verification testing to WorkerConfigTest. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8620:
URL: https://github.com/apache/kafka/pull/8620


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421758877



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }

Review comment:
       I think this method should be on `ResponseHttpHeadersValidator`, not the `WorkerConfig`.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -54,17 +55,40 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.List;
 
 import static org.apache.kafka.connect.runtime.WorkerConfig.ADMIN_LISTENERS_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThrows;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"})
 public class RestServerTest {
+    protected static final String WHITESPACE = " \t \n \r ";
+    protected static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
+            "add \t Cache-Control: no-cache, no-store, must-revalidate",
+            "add \r X-XSS-Protection: 1; mode=block",
+            "\n add Strict-Transport-Security: max-age=31536000; includeSubDomains",
+            "AdD   Strict-Transport-Security:  \r  max-age=31536000;  includeSubDomains",
+            "AdD \t Strict-Transport-Security : \n   max-age=31536000;  includeSubDomains",
+            "add X-Content-Type-Options: \r nosniff"

Review comment:
       What about adding valid configs for `set`, `addDate` and `setDate`, too?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       This could be package-level protected, right? The only place it should be called is in `ResponseHttpHeadersValidator` and in tests.
   ```suggestion
       // Visible for testing
       static void validateHttpResponseHeaderConfig(String config) {
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "set add X-XSS-Protection: 1";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedAction() {
+        String headerConfig = "X-Frame-Options: DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderName() {
+        String headerConfig = "add :DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderValue() {
+        String headerConfig = "add X-Frame-Options";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();
+    }
+

Review comment:
       Looks good. I like the additional checking that you're doing here.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +418,80 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testInvalidHeaderConfigs() {
+        for (String config : INVALID_HEADER_CONFIGS) {
+            assertInvalidHeaderConfig(config);
+        }
+    }
+
+    @Test
+    public void testValidHeaderConfigs() {
+        for (String config : VALID_HEADER_CONFIGS) {
+            assertValidHeaderConfig(config);
+        }
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();

Review comment:
       How about putting the `server.stop()` and `server = null` in a finally block? Also, `CloseableHttpResponse` is `AutoCloseable`, so we could actually use a try-with-resources here:
   ```suggestion
           server = new RestServer(workerConfig);
           try {
             server.initializeServer();
             server.initializeResources(herder);
             HttpRequest request = new HttpGet("/connectors");
             try (CloseableHttpClient httpClient = HttpClients.createMinimal()) {
               HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
               try (CloseableHttpResponse response = httpClient.execute(httpHost, request)) {
                 Assert.assertEquals(200, response.getStatusLine().getStatusCode());
                 if (!headerConfig.isEmpty()) {
                     expectedHeaders.forEach((k, v) ->
                             Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
                 } else {
                     Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
                 }
               }
             }
           } finally {
             server.stop();
             server = null;
           }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {
+        try {
+            // validate format
+            String[] configTokens = config.trim().split("\\s+", 2);
+            if (configTokens.length != 2) {
+                throw new ConfigException(String.format("Invalid format of header config '%s\'. "
+                        + "Expected: '[ation] [header name]:[header value]'", config));
+            }
+
+            // validate action
+            String method = configTokens[0].trim();
+            validateHeaderConfigAction(method);
+
+            // validate header name and header value pair
+            String header = configTokens[1];
+            String[] headerTokens = header.trim().split(":");
+            if (headerTokens.length != 2) {
+                throw new ConfigException(
+                        String.format("Invalid format of header name and header value pair '%s'. "
+                                + "Expected: '[header name]:[header value]'", header));
+            }
+
+            // validate header name
+            String headerName = headerTokens[0].trim();
+            if (headerName.isEmpty() || headerName.matches(".*\\s+.*")) {
+                throw new ConfigException(String.format("Invalid header name '%s'. "
+                        + "The '[header name]' cannot contain whitespace", headerName));
+            }
+        } catch (ArrayIndexOutOfBoundsException e) {
+            throw new ConfigException(String.format("Invalid header config '%s'.", config), e);
+        }
+    }
+
+    public static void validateHeaderConfigAction(String action) {

Review comment:
       This should be package-level protected:
   ```suggestion
       // Visible for testing
       static void validateHeaderConfigAction(String action) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r422385330



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -400,6 +412,53 @@ public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         logInternalConverterDeprecationWarnings(props);
     }
 
+    @Override
+    public String toString() {
+        return "Comma-separated header rules, where each header rule is of the form "
+                + "'[action] [header name]:[header value]' and optionally surrounded by double quotes "
+                + "if any part of a header rule contains a comma";
+    }
+
+    public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       Then how about making these package-level protected (as I mentioned) and moving the tests of these methods to `WorkerConfigTest` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421147218



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "set add X-XSS-Protection: 1";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedAction() {
+        String headerConfig = "X-Frame-Options: DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderName() {
+        String headerConfig = "add :DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderValue() {
+        String headerConfig = "add X-Frame-Options";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();
+    }
+

Review comment:
       It is good idea  to assert each checking using assertValidHeaderConfig and assertInvalidHeaderConfig. I think latest version will cover validation of header config. Please let me know. For all valid header config cases, testValidCustomizedHttpResponseHeaders should cover both config validation at config phase and validation on FilterHolder layer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r428877515



##########
File path: checkstyle/import-control.xml
##########
@@ -362,6 +362,7 @@
       <allow pkg="org.reflections"/>
       <allow pkg="org.reflections.util"/>
       <allow pkg="javax.crypto"/>
+      <allow pkg="org.eclipse.jetty.util" />

Review comment:
       FYI: this new dependency has no transitive dependencies




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeffhuang26 commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
jeffhuang26 commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r421147218



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "set add X-XSS-Protection: 1";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedAction() {
+        String headerConfig = "X-Frame-Options: DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderName() {
+        String headerConfig = "add :DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderValue() {
+        String headerConfig = "add X-Frame-Options";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();
+    }
+

Review comment:
       It is good idea  to assert each checking using assertValidHeaderConfig and assertInvalidHeaderConfig. I think latest version will cover validation of header config. Please let me know. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r420837649



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);

Review comment:
       ~How about using `strValue.split(',')`, and avoiding the need to import `org.eclipse.jetty.util.StringUtil`?~
   
   Okay, I see now that `cvsSplit` handles quotes. How about a comment:
   ```suggestion
               String[] configs = StringUtil.csvSplit(strValue); // handles and removes surrounding quotes
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));
+                }
+
+                // validate action
+                String method = configTokens[0].trim();
+                validateHeaderConfigAction(method);
+
+                // validate header name and header value pair
+                String header = configTokens[1];
+                String[] headerTokens = header.trim().split(":");
+                if (headerTokens.length != 2) {
+                    throw new ConfigException(
+                            String.format("Invalid format of header name and header value pair \"%s\". "
+                                    + "Expected: \"[header name]:[header value]\"", header));
+                }
+
+                // validate header name
+                String headerName = headerTokens[0].trim();
+                if (headerName.isEmpty() || headerName.contains(" ")) {
+                    throw new ConfigException(String.format("Invalid header name \"%s\". "
+                            + "The \"[header name]\" cannot contain whitespace", headerName));

Review comment:
       Nit:
   ```suggestion
                       throw new ConfigException(String.format("Invalid header name '%s'. "
                               + "The '[header name]' cannot contain whitespace", headerName));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));
+                }
+
+                // validate action
+                String method = configTokens[0].trim();
+                validateHeaderConfigAction(method);
+
+                // validate header name and header value pair
+                String header = configTokens[1];
+                String[] headerTokens = header.trim().split(":");
+                if (headerTokens.length != 2) {
+                    throw new ConfigException(
+                            String.format("Invalid format of header name and header value pair \"%s\". "
+                                    + "Expected: \"[header name]:[header value]\"", header));
+                }
+
+                // validate header name
+                String headerName = headerTokens[0].trim();
+                if (headerName.isEmpty() || headerName.contains(" ")) {
+                    throw new ConfigException(String.format("Invalid header name \"%s\". "
+                            + "The \"[header name]\" cannot contain whitespace", headerName));
+                }
+            } catch (ArrayIndexOutOfBoundsException e) {
+                throw new ConfigException(String.format("Invalid header config \"%s\".", config), e);

Review comment:
       Nit:
   ```suggestion
                   throw new ConfigException(String.format("Invalid header config '%s'", config), e);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }

Review comment:
       Should probably add this as the first bit in this method:
   ```suggestion
               String strValue = (String) value;
               if (value == null || strValue.trim().isEmpty()) {
                   return;
               }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));
+                }
+
+                // validate action
+                String method = configTokens[0].trim();
+                validateHeaderConfigAction(method);
+
+                // validate header name and header value pair
+                String header = configTokens[1];
+                String[] headerTokens = header.trim().split(":");
+                if (headerTokens.length != 2) {
+                    throw new ConfigException(
+                            String.format("Invalid format of header name and header value pair \"%s\". "
+                                    + "Expected: \"[header name]:[header value]\"", header));

Review comment:
       Nit:
   ```suggestion
                               String.format("Invalid format of header name and header value pair '%s'. "
                                       + "Expected: '[header name]:[header value]'", header));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {

Review comment:
       There's no benefit to having the `validateHttpResponseHeaderConfig(...)` method static, so how about making it non-static? And you can use a method reference to make it a tiny bit more readable:
   ```suggestion
               Arrays.stream(configs).forEach(this::validateHttpResponseHeaderConfig);
           }
   
           private void validateHttpResponseHeaderConfig(String config) {
   ```
   
   Or, if you want to make it easier to test, then move to `WorkerConfig` class and make package-level static:
   ```
               Arrays.stream(configs).forEach(WorkerConfig::validateHttpResponseHeaderConfig);
   ```
   I actually think this is the best way to go, because then you can easily add lots of test methods that thoroughly test each of these methods for both positive and negative cases.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));

Review comment:
       Nits:
   ```suggestion
                       throw new ConfigException(String.format("Invalid format of header config '%s'. "
                               + "Expected: '[action] [header name]:[header value]'", config));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));
+                }
+
+                // validate action
+                String method = configTokens[0].trim();
+                validateHeaderConfigAction(method);
+
+                // validate header name and header value pair
+                String header = configTokens[1];
+                String[] headerTokens = header.trim().split(":");
+                if (headerTokens.length != 2) {
+                    throw new ConfigException(
+                            String.format("Invalid format of header name and header value pair \"%s\". "
+                                    + "Expected: \"[header name]:[header value]\"", header));
+                }
+
+                // validate header name
+                String headerName = headerTokens[0].trim();
+                if (headerName.isEmpty() || headerName.contains(" ")) {

Review comment:
       Shouldn't this look for other whitespace characters, per the exception message? Something like:
   ```suggestion
                   if (headerName.isEmpty() || headerName.matches("\\s")) {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String strValue = (String) value;
+            if (strValue.isEmpty()) {
+                return;
+            }
+
+            String[] configs = StringUtil.csvSplit(strValue);
+            Arrays.stream(configs).forEach(config -> validateHttpResponseHeaderConfig(config));
+        }
+
+        private static void validateHttpResponseHeaderConfig(String config) {
+            try {
+                // validate format
+                String[] configTokens = config.trim().split("\\s+", 2);
+                if (configTokens.length != 2) {
+                    throw new ConfigException(String.format("Invalid format of header config \"%s\". "
+                            + "Expected: \"[ation] [header name]:[header value]\"", config));
+                }
+
+                // validate action
+                String method = configTokens[0].trim();
+                validateHeaderConfigAction(method);
+
+                // validate header name and header value pair
+                String header = configTokens[1];
+                String[] headerTokens = header.trim().split(":");
+                if (headerTokens.length != 2) {
+                    throw new ConfigException(
+                            String.format("Invalid format of header name and header value pair \"%s\". "
+                                    + "Expected: \"[header name]:[header value]\"", header));
+                }
+
+                // validate header name
+                String headerName = headerTokens[0].trim();
+                if (headerName.isEmpty() || headerName.contains(" ")) {
+                    throw new ConfigException(String.format("Invalid header name \"%s\". "
+                            + "The \"[header name]\" cannot contain whitespace", headerName));
+                }
+            } catch (ArrayIndexOutOfBoundsException e) {
+                throw new ConfigException(String.format("Invalid header config \"%s\".", config), e);
+            }
+        }
+
+        private static void validateHeaderConfigAction(String action) {
+            /**
+             * The following actions are defined following link.
+             * {@link https://www.eclipse.org/jetty/documentation/current/header-filter.html}
+             **/
+            if (!Arrays.asList("set", "add", "setDate", "addDate")
+                    .stream()
+                    .anyMatch(action::equalsIgnoreCase)) {
+                throw new ConfigException(String.format("Invalid header config action: \"%s\". "
+                        + "The action need be one of [\"set\", \"add\", \"setDate\", \"addDate\"]", action));

Review comment:
       How about defining a static immutable list as a constant:
   ```
       private static final Collection<String> HEADER_ACTIONS = Collections.unmodifiableList(
               Arrays.asList("set", "add", "setDate", "addDate")
       );
   ```
   so that these lines can become:
   ```suggestion
               if (!HEADER_ACTIONS.stream().anyMatch(action::equalsIgnoreCase)) {
                   throw new ConfigException(String.format("Invalid header config action: '%s'. "
                           + "Expected one of %s", action, HEADER_ACTIONS));
   ```
   This eliminates the duplication of the literal values (which is prone to future errors) and makes the code more readable.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -427,4 +435,62 @@ public void ensureValid(String name, Object value) {
         }
     }
 
+    private static class ResponseHttpHeadersValidator implements ConfigDef.Validator {

Review comment:
       This class should have a `toString()` method that describes what's required, so some string like:
   > Comma-separated header rules, where each header rule is of the form '[action] [header name]:[header value]' and optionally surrounded by double quotes if any part of a header rule contains a comma

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -392,6 +395,98 @@ public void testDisableAdminEndpoint() throws IOException {
         Assert.assertEquals(404, response.getStatusLine().getStatusCode());
     }
 
+    @Test
+    public void testValidCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig =
+                "add X-XSS-Protection: 1; mode=block, \"add Cache-Control: no-cache, no-store, must-revalidate\"";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+        expectedHeaders.put("Cache-Control", "no-cache, no-store, must-revalidate");
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test
+    public void testDefaultCustomizedHttpResponseHeaders() throws IOException  {
+        String headerConfig = "";
+        Map<String, String> expectedHeaders = new HashMap<>();
+        checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigFormat() {
+        String headerConfig = "set add X-XSS-Protection: 1";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedAction() {
+        String headerConfig = "X-Frame-Options: DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderName() {
+        String headerConfig = "add :DENY";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testMissedHeaderValue() {
+        String headerConfig = "add X-Frame-Options";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testInvalidHeaderConfigAction() {
+        String headerConfig = "badaction X-XSS-Protection: 1; mode=block";
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+    }
+
+    public void checkCustomizedHttpResponseHeaders(String headerConfig, Map<String, String> expectedHeaders)
+            throws IOException  {
+        Map<String, String> workerProps = baseWorkerProps();
+        workerProps.put("offset.storage.file.filename", "/tmp");
+        workerProps.put(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG, headerConfig);
+        WorkerConfig workerConfig = new DistributedConfig(workerProps);
+
+        EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
+                workerConfig,
+                ConnectRestExtension.class)).andStubReturn(Collections.emptyList());
+
+        EasyMock.expect(herder.connectors()).andReturn(Arrays.asList("a", "b"));
+
+        PowerMock.replayAll();
+
+        server = new RestServer(workerConfig);
+        server.initializeServer();
+        server.initializeResources(herder);
+        HttpRequest request = new HttpGet("/connectors");
+        CloseableHttpClient httpClient = HttpClients.createMinimal();
+        HttpHost httpHost = new HttpHost(server.advertisedUrl().getHost(), server.advertisedUrl().getPort());
+        CloseableHttpResponse response = httpClient.execute(httpHost, request);
+        Assert.assertEquals(200, response.getStatusLine().getStatusCode());
+        if (!headerConfig.isEmpty()) {
+            expectedHeaders.forEach((k, v) ->
+                    Assert.assertEquals(response.getFirstHeader(k).getValue(), v));
+        } else {
+            Assert.assertNull(response.getFirstHeader("X-Frame-Options"));
+        }
+        response.close();
+        server.stop();
+    }
+

Review comment:
       These are great and very useful, but they still don't thoroughly test the `validateHttpResponseHeaderConfig` and `validateHeaderConfigAction` methods. If you make these methods static, package-protected in the `WorkerConfig` class (rather than in a nested class), then you can easily add more tests to `WorkerConfigTest` that thoroughly verify all of the logic in those methods.
   
   For example, something like the following in `WorkerConfigTest`:
   ```
       protected static final List<String> VALID_UNQUOTED_HEADER_CONFIGS = Arrays.asList(
               // TODO: Add a lot more valid header configs
               "\"add Cache-Control: no-cache, no-store, must-revalidate\"",
               "add X-XSS-Protection: 1; mode=block",
               "add Strict-Transport-Security: max-age=31536000; includeSubDomains",
               "AdD   Strict-Transport-Security:    max-age=31536000;  includeSubDomains",
               "AdD \t Strict-Transport-Security : \n   max-age=31536000;  includeSubDomains",
               "add X-Content-Type-Options: nosniff"
       );
   
       protected static final List<String> VALID_QUOTE_REQUIRED_HEADER_CONFIGS = Arrays.asList(
               // TODO: Add a lot more valid header configs
               "add Cache-Control: no-cache, no-store, must-revalidate"
       );
   
       protected static final List<String> INVALID_UNQUOTED_HEADER_CONFIGS = Arrays.asList(
               // TODO: Add a lot more valid header configs
               "WRONG Cache-Control: no-cache",
               "add Cache-Control no-cache",
               "WRONG Cache-Control: no-cache, no-store, must-revalidate",
       );
   
       protected static final String WHITESPACE = " \t \n  ";
   
       @Test
       public void testSingleValidHeaderConfigs() {
           for (String config : VALID_UNQUOTED_HEADER_CONFIGS) {
               assertValidHeaderConfig(config);
           }
           for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) {
               assertValidHeaderConfig("\"" + config + "\"");
           }
       }
   
       @Test
       public void testSingleValidHeaderConfigsWithWhitespace() {
           for (String config : VALID_UNQUOTED_HEADER_CONFIGS) {
               assertValidHeaderConfig(WHITESPACE + config + WHITESPACE);
           }
           for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) {
               assertValidHeaderConfig(WHITESPACE + "\"" + WHITESPACE + config + WHITESPACE + "\"" + WHITESPACE);
           }
       }
   
       @Test
       public void testMultipleValidHeaderConfigsWithoutWhitespace() {
           assertValidHeaderConfig(String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS));
           assertValidHeaderConfig(String.join(" , ", VALID_UNQUOTED_HEADER_CONFIGS));
       }
   
       @Test
       public void testHeaderConfigsThatRequireQuotes() {
           for (String config : VALID_QUOTE_REQUIRED_HEADER_CONFIGS) {
               assertInvalidHeaderConfig(config);
           }
       }
   
       @Test
       public void testInvalidHeaderConfigs() {
           for (String config : INVALID_UNQUOTED_HEADER_CONFIGS) {
               assertInvalidHeaderConfig(config);
           }
       }
   
       @Test
       public void testOneInvalidAndMultipleValidHeaderConfigs() {
           assertInvalidHeaderConfig(String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS)
                                     + ", " + INVALID_UNQUOTED_HEADER_CONFIGS.get(0));
           assertInvalidHeaderConfig(INVALID_UNQUOTED_HEADER_CONFIGS.get(0) + ", "
                                     + String.join(", ", VALID_UNQUOTED_HEADER_CONFIGS));
       }
   
       protected void assertValidHeaderConfig(String config) {
           WorkerConfig.validateHttpResponseHeaderConfig(config);
           // any valid config should be valid per HeaderFilter
           configureHeaderFilter(config);
       }
   
       protected void assertInvalidHeaderConfig(String config) {
           assertThrows(ConfigException.class, () -> WorkerConfig.validateHttpResponseHeaderConfig(config));
           // any invalid config should be also be invalid per HeaderFilter
           assertThrows(ConfigException.class, () -> configureHeaderFilter(config));
       }
   
       protected void configureHeaderFilter(String headerConfig) {
           FilterHolder headerFilterHolder = new FilterHolder(HeaderFilter.class);
           headerFilterHolder.setInitParameter("headerConfig", headerConfig);
           try {
               try {
                   headerFilterHolder.doStart();
                   headerFilterHolder.initialize();
               } finally {
                   headerFilterHolder.doStop();
               }
           } catch (Exception e) {
             // wrap in ConfigException to keep the test simple
             throw new ConfigException("HeaderFilter failure", e);
           }
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#issuecomment-633060237


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org