You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/02/11 15:16:53 UTC
[kafka] branch trunk updated: KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 03af63d KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)
03af63d is described below
commit 03af63d0761cff0cc895f65825829eb6314c0de1
Author: lhunyady <83...@users.noreply.github.com>
AuthorDate: Fri Feb 11 16:14:06 2022 +0100
KAFKA-13306: Null connector config value passes validation, but fails creation (#11333)
This patch adds null value check to the connector config validation, and extends unit tests to cover this functionality.
Reviewers: Mickael Maison <mi...@gmail.com>, Chris Egerton <ch...@confluent.io>, Boyang Chen <bc...@outlook.com>, Andras Katona <ak...@cloudera.com>
---
.../kafka/connect/runtime/AbstractHerder.java | 7 +++
.../connect/runtime/rest/entities/ConfigInfo.java | 2 +-
.../kafka/connect/runtime/AbstractHerderTest.java | 52 ++++++++++++++++++++++
.../resources/ConnectorPluginsResourceTest.java | 2 +-
4 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 30555ef..89020a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -442,6 +442,13 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
enrichedConfigDef,
connectorProps
);
+ connectorProps.entrySet().stream()
+ .filter(e -> e.getValue() == null)
+ .map(Map.Entry::getKey)
+ .forEach(prop ->
+ validatedConnectorConfig.computeIfAbsent(prop, ConfigValue::new)
+ .addErrorMessage("Null value can not be supplied as the configuration value.")
+ );
List<ConfigValue> configValues = new ArrayList<>(validatedConnectorConfig.values());
Map<String, ConfigKey> configKeys = new LinkedHashMap<>(enrichedConfigDef.configKeys());
Set<String> allGroups = new LinkedHashSet<>(enrichedConfigDef.groups());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
index 49a2f6f..06e6153 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
@@ -60,6 +60,6 @@ public class ConfigInfo {
@Override
public String toString() {
- return "[" + configKey.toString() + "," + configValue.toString() + "]";
+ return "[" + configKey + "," + configValue + "]";
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 8c8d00d..360c4ae 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -57,6 +57,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -289,6 +290,46 @@ public class AbstractHerderTest {
assertFalse(mayBeRestartPlan.isPresent());
}
+ @Test()
+ public void testConfigValidationNullConfig() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+ replayAll();
+
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+ config.put("name", "somename");
+ config.put("required", "value");
+ config.put("testKey", null);
+
+ final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
+
+ assertEquals(1, configInfos.errorCount());
+ assertErrorForKey(configInfos, "testKey");
+
+ verifyAll();
+ }
+
+ @Test
+ public void testConfigValidationMultipleNullConfig() {
+ AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+ replayAll();
+
+ Map<String, String> config = new HashMap<>();
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
+ config.put("name", "somename");
+ config.put("required", "value");
+ config.put("testKey", null);
+ config.put("secondTestKey", null);
+
+ final ConfigInfos configInfos = herder.validateConnectorConfig(config, false);
+
+ assertEquals(2, configInfos.errorCount());
+ assertErrorForKey(configInfos, "testKey");
+ assertErrorForKey(configInfos, "secondTestKey");
+
+ verifyAll();
+ }
+
@Test
public void testBuildRestartPlanForConnectorAndTasks() {
RestartRequest restartRequest = new RestartRequest(connector, false, true);
@@ -702,6 +743,17 @@ public class AbstractHerderTest {
assertFalse(reverseTransformed.get(0).containsKey(TEST_KEY3));
}
+ private void assertErrorForKey(ConfigInfos configInfos, String testKey) {
+ final List<String> errorsForKey = configInfos.values().stream()
+ .map(ConfigInfo::configValue)
+ .filter(configValue -> configValue.name().equals(testKey))
+ .map(ConfigValueInfo::errors)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ assertEquals(1, errorsForKey.size());
+ }
+
@Test
public void testConfigProviderRegex() {
testConfigProviderRegex("\"${::}\"");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index c1d06f9..4797297 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -97,7 +97,7 @@ public class ConnectorPluginsResourceTest {
props = new HashMap<>(partialProps);
props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
- props.put("plugin.path", null);
+ props.put("plugin.path", "test.path");
}
private static final ConfigInfos CONFIG_INFOS;