You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/23 19:21:30 UTC

[kafka] branch trunk updated: KAFKA-8407: Fix validation of class and list configs in connector client overrides (#6789)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5351efe  KAFKA-8407: Fix validation of class and list configs in connector client overrides (#6789)
5351efe is described below

commit 5351efe48ac71318fbbca21ac280c43921267744
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Thu May 23 12:21:19 2019 -0700

    KAFKA-8407: Fix validation of class and list configs in connector client overrides (#6789)
    
    Because of how config values are converted into strings in the `AbstractHerder.validateClientOverrides()` method after being validated by the client override policy, an exception is thrown if the value returned by the policy isn't already parsed as the type expected by the client `ConfigDef`. The fix here involves parsing client override properties before passing them to the override policy.
    
    A unit test is added to ensure that several different types of configs are validated properly by the herder.
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Magesh Nandakumar <ma...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 .../kafka/connect/runtime/AbstractHerder.java      | 11 +++-
 .../kafka/connect/runtime/AbstractHerderTest.java  | 64 ++++++++++++++++++++--
 2 files changed, 70 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index e92f55e..f66029c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -403,7 +403,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         List<ConfigInfo> configInfoList = new LinkedList<>();
         Map<String, ConfigKey> configKeys = configDef.configKeys();
         Set<String> groups = new LinkedHashSet<>();
-        Map<String, Object> clientConfigs = connectorConfig.originalsWithPrefix(prefix);
+        Map<String, Object> clientConfigs = new HashMap<>();
+        for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
+            String configName = rawClientConfig.getKey();
+            Object rawConfigValue = rawClientConfig.getValue();
+            ConfigKey configKey = configDef.configKeys().get(configName);
+            Object parsedConfigValue = configKey != null
+                ? ConfigDef.parseType(configName, rawConfigValue, configKey.type)
+                : rawConfigValue;
+            clientConfigs.put(configName, parsedConfigValue);
+        }
         ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
             connName, connectorType, connectorClass, clientConfigs, clientType);
         List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 35c0dd2..2c341bc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -16,18 +16,22 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
@@ -56,6 +60,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.powermock.api.easymock.PowerMock.verifyAll;
 import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -364,14 +369,12 @@ public class AbstractHerderTest {
         AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
         replayAll();
 
-        // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
-        // class info that should generate an error.
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
         config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
         config.put("required", "value"); // connector required config
-        String ackConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ACKS_CONFIG;
-        String saslConfigKey = ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + SaslConfigs.SASL_JAAS_CONFIG;
+        String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
+        String saslConfigKey = producerOverrideKey(SaslConfigs.SASL_JAAS_CONFIG);
         config.put(ackConfigKey, "none");
         config.put(saslConfigKey, "jaas_config");
 
@@ -400,6 +403,55 @@ public class AbstractHerderTest {
     }
 
     @Test
+    public void testConfigValidationAllOverride() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new AllConnectorClientConfigOverridePolicy());
+        replayAll();
+
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
+        config.put("required", "value"); // connector required config
+        // Try to test a variety of configuration types: string, int, long, boolean, list, class
+        String protocolConfigKey = producerOverrideKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
+        config.put(protocolConfigKey, "SASL_PLAINTEXT");
+        String maxRequestSizeConfigKey = producerOverrideKey(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
+        config.put(maxRequestSizeConfigKey, "420");
+        String maxBlockConfigKey = producerOverrideKey(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+        config.put(maxBlockConfigKey, "28980");
+        String idempotenceConfigKey = producerOverrideKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+        config.put(idempotenceConfigKey, "true");
+        String bootstrapServersConfigKey = producerOverrideKey(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        config.put(bootstrapServersConfigKey, "SASL_PLAINTEXT://localhost:12345,SASL_PLAINTEXT://localhost:23456");
+        String loginCallbackHandlerConfigKey = producerOverrideKey(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
+        config.put(loginCallbackHandlerConfigKey, OAuthBearerUnsecuredLoginCallbackHandler.class.getName());
+
+        final Set<String> overriddenClientConfigs = new HashSet<>();
+        overriddenClientConfigs.add(protocolConfigKey);
+        overriddenClientConfigs.add(maxRequestSizeConfigKey);
+        overriddenClientConfigs.add(maxBlockConfigKey);
+        overriddenClientConfigs.add(idempotenceConfigKey);
+        overriddenClientConfigs.add(bootstrapServersConfigKey);
+        overriddenClientConfigs.add(loginCallbackHandlerConfigKey);
+
+        ConfigInfos result = herder.validateConnectorConfig(config);
+        assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
+
+        Map<String, String> validatedOverriddenClientConfigs = new HashMap<>();
+        for (ConfigInfo configInfo : result.values()) {
+            String configName = configInfo.configKey().name();
+            if (overriddenClientConfigs.contains(configName)) {
+                validatedOverriddenClientConfigs.put(configName, configInfo.configValue().value());
+            }
+        }
+        Map<String, String> rawOverriddenClientConfigs = config.entrySet().stream()
+            .filter(e -> overriddenClientConfigs.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs);
+        verifyAll();
+    }
+
+    @Test
     public void testReverseTransformConfigs() {
         // Construct a task config with constant values for TEST_KEY and TEST_KEY2
         Map<String, String> newTaskConfig = new HashMap<>();
@@ -482,4 +534,8 @@ public class AbstractHerderTest {
 
     private abstract class BogusSourceTask extends SourceTask {
     }
+
+    private static String producerOverrideKey(String config) {
+        return ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + config;
+    }
 }