You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/02/11 15:16:53 UTC

[kafka] branch trunk updated: KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03af63d  KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)
03af63d is described below

commit 03af63d0761cff0cc895f65825829eb6314c0de1
Author: lhunyady <83...@users.noreply.github.com>
AuthorDate: Fri Feb 11 16:14:06 2022 +0100

    KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)
    
    This patch adds null value check to the connector config validation, and extends unit tests to cover this functionality.
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <ch...@confluent.io>, Boyang Chen <bc...@outlook.com>, Andras Katona <ak...@cloudera.com>
---
 .../kafka/connect/runtime/AbstractHerder.java      |  7 +++
 .../connect/runtime/rest/entities/ConfigInfo.java  |  2 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  | 52 ++++++++++++++++++++++
 .../resources/ConnectorPluginsResourceTest.java    |  2 +-
 4 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 30555ef..89020a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -442,6 +442,13 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
                     enrichedConfigDef,
                     connectorProps
             );
+            connectorProps.entrySet().stream()
+                .filter(e -> e.getValue() == null)
+                .map(Map.Entry::getKey)
+                .forEach(prop ->
+                    validatedConnectorConfig.computeIfAbsent(prop, ConfigValue::new)
+                        .addErrorMessage("Null value can not be supplied as the configuration value.")
+            );
             List<ConfigValue> configValues = new ArrayList<>(validatedConnectorConfig.values());
             Map<String, ConfigKey> configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys());
             Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
index 49a2f6f..06e6153 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
@@ -60,6 +60,6 @@ public class ConfigInfo {
 
     @Override
     public String toString() {
-        return "[" + configKey.toString() + "," + configValue.toString() + "]";
+        return "[" + configKey + "," + configValue + "]";
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 8c8d00d..360c4ae 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -57,6 +57,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -289,6 +290,46 @@ public class AbstractHerderTest {
         assertFalse(mayBeRestartPlan.isPresent());
     }
 
+    @Test()
+    public void testConfigValidationNullConfig() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+        replayAll();
+
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        config.put("name", "somename");
+        config.put("required", "value");
+        config.put("testKey", null);
+
+        final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
+
+        assertEquals(1, configInfos.errorCount());
+        assertErrorForKey(configInfos, "testKey");
+
+        verifyAll();
+    }
+
+    @Test
+    public void testConfigValidationMultipleNullConfig() {
+        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+        replayAll();
+
+        Map<String, String> config = new HashMap<>();
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+        config.put("name", "somename");
+        config.put("required", "value");
+        config.put("testKey", null);
+        config.put("secondTestKey", null);
+
+        final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
+
+        assertEquals(2, configInfos.errorCount());
+        assertErrorForKey(configInfos, "testKey");
+        assertErrorForKey(configInfos, "secondTestKey");
+
+        verifyAll();
+    }
+
     @Test
     public void testBuildRestartPlanForConnectorAndTasks() {
         RestartRequest restartRequest = new RestartRequest(connector, false, true);
@@ -702,6 +743,17 @@ public class AbstractHerderTest {
         assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
     }
 
+    private void assertErrorForKey(ConfigInfos configInfos, String testKey) {
+        final List<String> errorsForKey = configInfos.values().stream()
+                .map(ConfigInfo::configValue)
+                .filter(configValue -> configValue.name().equals(testKey))
+                .map(ConfigValueInfo::errors)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+        assertEquals(1, errorsForKey.size());
+    }
+
     @Test
     public void testConfigProviderRegex() {
         testConfigProviderRegex("\"${::}\"");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index c1d06f9..4797297 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -97,7 +97,7 @@ public class ConnectorPluginsResourceTest {
 
         props = new HashMap<>(partialProps);
         props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
-        props.put("plugin.path", null);
+        props.put("plugin.path", "test.path");
     }
 
     private static final ConfigInfos CONFIG_INFOS;